## Imports


In [1]:
import yaml
from dotenv import load_dotenv

## Load Configs


In [2]:
with open("../configs/user_config.yaml") as f:
    model_config = yaml.safe_load(f)

load_dotenv("../configs/environment_variables.env")

True

## Knowledgebase Creation


### Defaults


In [3]:
INDEX_NAME = "financial_regulations"
DATA_PATH = "../data"

### Imports


In [None]:
import os

from azure.ai.generative.index import build_index
from azure.ai.resources.client import AIClient
from azure.ai.resources.operations._index_data_source import (
    ACSOutputConfig,
    LocalSource,
)
from azure.identity import DefaultAzureCredential

from azure.core.credentials import AzureKeyCredential
from azure.search.documents.aio import SearchClient
from azure.search.documents.models import RawVectorQuery
from openai import AsyncAzureOpenAI

### Functions


In [None]:
def build_cogsearch_index(
    index_name: str,
    path_to_data: str,
    chunk_size: int,
    chunk_overlap: int,
    data_source_url: str = None,
):
    # Set up environment variables for cog search SDK
    os.environ["AZURE_COGNITIVE_SEARCH_TARGET"] = os.environ.get(
        "AZURE_AI_SEARCH_ENDPOINT", ""
    )
    os.environ["AZURE_COGNITIVE_SEARCH_KEY"] = os.environ.get("AZURE_AI_SEARCH_KEY", "")

    client = AIClient.from_config(DefaultAzureCredential())

    default_aoai_connection = client.get_default_aoai_connection()
    default_aoai_connection.set_current_environment()

    default_acs_connection = client.connections.get(
        os.environ.get("AZURE_COGNITIVE_SEARCH_CONNECTION_NAME", "")
    )
    default_acs_connection.set_current_environment()

    # Use the same index name when registering the index in AI Studio
    index = build_index(
        output_index_name=index_name,
        vector_store=os.environ.get("VECTOR_STORE", ""),
        embeddings_model=f"azure_open_ai://deployment/{os.environ.get('AZURE_OPENAI_EMBEDDING_DEPLOYMENT')}/model/{os.environ.get('AZURE_OPENAI_EMBEDDING_MODEL')}",
        data_source_url=data_source_url,
        index_input_config=LocalSource(input_data=path_to_data),
        acs_config=ACSOutputConfig(
            acs_index_name=index_name,
        ),
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )

    # register the index so that it shows up in the project
    cloud_index = client.indexes.create_or_update(index)

    print(f"Created index '{cloud_index.name}'")
    print(f"Local Path: {index.path}")
    print(f"Cloud Path: {cloud_index.path}")

### Ingest Documents


In [None]:
build_cogsearch_index(
    index_name=INDEX_NAME,
    path_to_data=DATA_PATH,
    chunk_size=model_config["rag"]["chunk_size"],
    chunk_overlap=model_config["rag"]["chunk_overlap"],
)

## Chat with Documents


### Defaults


In [None]:
INDEX_NAME = "financial_regulations"

### Imports


In [4]:
import os

from azure.core.credentials import AzureKeyCredential
from azure.search.documents.aio import SearchClient
from azure.search.documents.models import RawVectorQuery
from openai import AsyncAzureOpenAI

import asyncio
from typing import List

import nest_asyncio
from openai import AzureOpenAI

nest_asyncio.apply()

### Functions


In [5]:
async def get_documents(
    question: str,
    index_name: str,
    num_docs=5,
) -> str:
    #  retrieve documents relevant to the user's question from Cognitive Search
    search_client = SearchClient(
        endpoint=os.environ.get("AZURE_AI_SEARCH_ENDPOINT", ""),
        credential=AzureKeyCredential(os.environ.get("AZURE_AI_SEARCH_KEY", "")),
        index_name=index_name,
    )

    async with AsyncAzureOpenAI(
        azure_endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT", ""),
        api_key=os.environ.get("AZURE_OPENAI_KEY", ""),
        api_version=os.environ.get("AZURE_OPENAI_API_VERSION", ""),
    ) as aclient:

        # generate a vector embedding of the user's question
        embedding = await aclient.embeddings.create(
            input=question, model=os.environ.get("AZURE_OPENAI_EMBEDDING_DEPLOYMENT")
        )
        embedding_to_query = embedding.data[0].embedding

    context = ""
    contexts = []
    async with search_client:
        # use the vector embedding to do a vector search on the index
        vector_query = RawVectorQuery(
            vector=embedding_to_query, k=num_docs, fields="contentVector"
        )
        results = await search_client.search(
            search_text="", vector_queries=[vector_query], select=["id", "content"]
        )

        async for result in results:
            context += f"\n>>> {result['content']}"
            contexts.append(result["content"])

    return context, contexts

In [6]:
def build_message(user_prompt: str, system_role: str) -> List[dict]:
    return [
        {"role": "system", "content": system_role},
        {"role": "user", "content": user_prompt},
    ]


def chat_completion(
    question: str,
    system_role: str,
    user_prompt: str,
    index_name: str,
    num_docs: int = 5,
    temperature: float = 0.7,
    max_tokens: int = 800,
):
    # get search documents for the last user message in the conversation
    context, contexts = asyncio.run(
        get_documents(
            question=question,
            index_name=index_name,
            num_docs=num_docs,
        )
    )

    # TODO: Add context to user message
    user_prompt = user_prompt.format(question=question, context=context)
    message = build_message(user_prompt=user_prompt, system_role=system_role)

    with AzureOpenAI(
        azure_endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT", ""),
        api_key=os.environ.get("AZURE_OPENAI_KEY", ""),
        api_version=os.environ.get("AZURE_OPENAI_API_VERSION", ""),
    ) as client:

        # call Azure OpenAI with the system prompt and user's question
        chat_completion = client.chat.completions.create(
            model=os.environ.get("AZURE_OPENAI_CHAT_DEPLOYMENT"),
            messages=message,
            temperature=temperature,
            max_tokens=max_tokens,
        )

    response = {
        "choices": [
            {
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": chat_completion.choices[0].message.content,
                },
            }
        ]
    }

    # add context in the returned response
    context_dict = {
        "context": context,
        "contexts": contexts,
        "num_docs": num_docs,
        "temperature": temperature,
        "max_tokens": max_tokens,
    }
    response["choices"][0]["context"] = context_dict
    return response

In [7]:
def chat_with_documents(question: str):
    result = chat_completion(
        question=question,
        system_role=model_config["prompt"]["system_role"],
        user_prompt=model_config["prompt"]["user_prompt"]
        + "\n\nQuestion:'{question}' \n\nContext: '{context}'",
        index_name=INDEX_NAME,
        num_docs=model_config["rag"]["num_docs"],
        temperature=model_config["model"]["temperature"],
        max_tokens=model_config["model"]["max_tokens"],
    )
    print(result["choices"][0]["message"]["content"])

### Question and Answering on the Data


In [8]:
chat_with_documents(
    question="What happens if a Power Unit Manufacturer does not submit the Full Year Reporting Documentation by the deadline? ",
)

If a Power Unit Manufacturer does not submit the Full Year Reporting Documentation by the deadline, the Cost Cap Administration will issue a late submission notice to the manufacturer. The manufacturer will then have 48 hours to provide a written explanation for the late submission. The Cost Cap Administration may grant an extension to the Full Year Reporting Deadline if satisfied with the explanation. However, if the manufacturer does not provide a written response within the specified time, provides an unsatisfactory response, or fails to submit the documentation by the Extended Reporting Deadline, they will have committed a Non-Submission Breach and will be referred to the Cost Cap Adjudication Panel. The panel may impose penalties such as Constructors' Championship points deduction, financial penalties, and other sporting penalties.
