In [1]:
# to use asynchronous functions or methods in jupyter notebook
import nest_asyncio

nest_asyncio.apply()
# allows using the running event loop
import asyncio

## Mitigation via Completion


### Schema Definations


In [352]:
## defining function schemas

from typing import Union, Literal
from pydantic import BaseModel, Field


# NonToolAnswer schema will be used for verifying the output
class NonToolAnswer(BaseModel):
    answer: str


avaialle_non_tool_answering = [{
    "name": "Answer",
    "description": "Answer the user message without being verbose.",
    "parameters": NonToolAnswer.model_json_schema(),
}]

function_names = [
    "Answer",
    "MakeCall",
    "CallInteraction",
    "SendMessage",
    "AudioInteraction",
    "MapInteraction",
]


class SearchTool(BaseModel):
    q: str = Field(..., description="Search Query.")


class UseContact(BaseModel):
    is_number: bool = Field(
        ...,
        description=
        "True if a number is specified by the user and False if the user provides contact name.",
    )
    number: Union[str, None] = Field(
        ..., description="The number if provided by the user else None.")
    contact_name: Union[str, None] = Field(
        ...,
        description=
        "If a number is not provided the user will provide the name of the contact to call.",
    )


class MakeCall(BaseModel):
    app_name: Literal["Phone", "WhatsApp"] = Field(
        default="Phone",
        description="Specify the application name provided by the user.",
    )
    meta: UseContact


class CallInteraction(BaseModel):
    interaction_type: Literal["ACCEPT", "REJECT"] = Field(
        ..., description="The user can ask to accept or reject the call.")


class SendMessage(BaseModel):
    app_name: Literal["Phone", "WhatsApp"] = Field(
        default="Phone",
        description="Specify the application name provided by the user.",
    )
    meta: UseContact
    message_text: str = Field(
        ..., description="The message the user wants to send.")


class AudioInteraction(BaseModel):
    action: Literal["Select", "Play", "Pause"] = Field(
        ...,
        description=
        "The user can either select a song to play, pause a song, or play a paused song",
    )
    is_select: bool = Field(
        ..., description="True if a user is asking to play a specific song.")
    song_name: Union[str, None] = Field(
        ...,
        description="Name of the song in case `is_select` is true else None.")


class Stop(BaseModel):
    action: Literal["Add", "Remove"] = Field(
        ..., description="The user can ask to add or remove a stop.")
    name: str = Field(..., description="Name of the stop.")


class MapInteraction(BaseModel):
    action: Literal["Start", "Update"] = Field(
        ...,
        description=
        "The user can start the map action by saying where they want to go. Or else they can provide an update to add or remove a stop.",
    )
    is_update: bool = Field(
        ...,
        description=
        "If the user is asking to add or remove a stop then it's update.",
    )
    stop: Union[Stop, None] = Field(
        ...,
        description=
        "The stop details are required if `is_update` is true else it can be None",
    )


available_domain_specific_tools = [
    {
        "name": "MakeCall",
        "description": "Used to make a call.",
        "parameters": MakeCall.model_json_schema(),
    },
    {
        "name": "CallInteraction",
        "description": "Used to interact with an incoming call.",
        "parameters": CallInteraction.model_json_schema(),
    },
    {
        "name": "SendMessage",
        "description": "Used to send a message.",
        "parameters": SendMessage.model_json_schema(),
    },
    {
        "name": "AudioInteraction",
        "description":
        "Used to interact with audio system like select music, pause music, or play a paused music",
        "parameters": AudioInteraction.model_json_schema(),
    },
    {
        "name": "MapInteraction",
        "description": "Used to set or update location in the map",
        "parameters": MapInteraction.model_json_schema(),
    },
]

available_search_tool_functions = [{
    "name":
    "Search",
    "description":
    "Search for the user query",
    "parameters":
    SearchTool.model_json_schema(),
}]

### LLM Completion Utilities


#### Context Management with Sliding Window


In [353]:
import tiktoken
from typing import List, Dict


class MessageManagement:
    """Removes tokens from start or end and provides string with max token lenght provided"""

    def __init__(
        self,
        model_name: str = "gpt-4-turbo-preview",
        encoding_name: Union[str, None] = None,
    ):
        try:
            self.encoding = tiktoken.encoding_for_model(model_name)
        except KeyError:
            self.encoding = tiktoken.get_encoding(encoding_name)

    def __count_tokens__(self, content: str):
        tokens = self.encoding.encode(content)
        return len(tokens) + 4

    def __pad_message__(self, content: str, num_tokens: int):
        tokens = self.encoding.encode(content)
        return self.encoding.decode(tokens[:num_tokens])

    def __call__(self, messages: List[Dict], max_length: int = 14_000):
        system_prompt = list(
            filter(lambda message: message.get("role") == "system", messages))
        other_messages = list(
            filter(lambda message: message.get("role") != "system", messages))

        managed_messages = []

        curr_length = 0
        if len(system_prompt) == 1:
            curr_length += self.__count_tokens__(
                system_prompt[0].get("content"))
        for message in other_messages[::-1]:
            if message.get("role") == "system":
                managed_messages += message
            else:
                lgth = self.__count_tokens__(message.get("content"))
                if curr_length + lgth >= max_length:
                    tokens_to_keep = max_length - curr_length
                    # print(f"TOKENS TO KEEP: ", tokens_to_keep)
                    if tokens_to_keep > 0:
                        padded_message = self.__pad_message__(
                            message.get("content"), tokens_to_keep)
                        message["content"] = padded_message
                        managed_messages.append(message)
                        curr_length += tokens_to_keep
                        break
                    else:
                        break
                else:
                    managed_messages.append(message)
                    curr_length += lgth
        managed_messages = system_prompt + managed_messages[::-1]
        return managed_messages

In [396]:
import os

os.environ["OPENAI_API_KEY"] = "YOUR OPENAI API KEY"

from abc import ABC, abstractmethod
from typing import List, Dict, Union
from openai import AsyncOpenAI, RateLimitError, APIConnectionError
import backoff
import numpy as np


class BaseLLM(ABC):

    def __init__(self, **kwargs):
        self.client = None

    @abstractmethod
    async def __complete__(self, messages: List[Dict], model: str, **kwargs):
        pass

    @abstractmethod
    async def __stream__(self, messages: List[Dict], model: str, **kwargs):
        pass

    @abstractmethod
    async def __function_call__(self, messages: List[Dict], model: str,
                                **kwargs):
        pass

In [37]:
FUNCTION_CALLING_SYSTEM_PROMPT = """You are a helpful assistant with access to the following functions:

    {functions}

    To use these functions respond with:
    <multiplefunctions>
        <functioncall> {{fn}} </functioncall>
        <functioncall> {{fn}} </functioncall>
        ...
    </multiplefunctions>

    Edge cases you must handle:
    - If there are no functions that match the user request, you will respond politely that you cannot help.<|im_end|>

    Refer the below provided output example for function calling
    Question: What's the weather difference in NY and LA?
    <multiplefunctions>
        <functioncall> {{"name": "getWeather", "parameters": {{"city": "NY"}}}} </functioncall>
        <functioncall> {{"name": "getWeather", "parameters": {{"city": "LA"}}}} </functioncall>
    </multiplefunctions>

    Note: You can even select only <functioncall> inside <multiplefunctions> block if needed.
    """

In [354]:
import re
import json
import logging
from xml.etree import ElementTree as ET


def extractUsingRegEx(output_text: str):
    pattern = r"<functioncall>\s*(\{.*?\})\s*</functioncall>"
    matches = re.findall(pattern, output_text, re.DOTALL)
    logging.info(f"Exception block Matches: {matches}")

    results = []
    for json_string in matches:
        try:
            json_data = json.loads(json_string)
            results.append(json_data)
        except json.JSONDecodeError as err:
            print(f"Error decoding JSON: {str(err)}")
            continue
    return results

In [259]:
class OpenAILLM(BaseLLM):

    def __init__(self, **kwargs):
        self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
        self.ctx = MessageManagement(kwargs.get("model"),
                                     kwargs.get("encoding_name", None))

    async def __complete__(self, messages: List[Dict], model: str, **kwargs):
        managed_messages = self.ctx(messages, 110_000)
        output = await self.client.chat.completions.create(
            messages=managed_messages, model=model, **kwargs)
        usage = output.usage.__dict__
        output_content = output.choices[0].message.content
        if "logprobs" in kwargs:
            return (
                output_content,
                output.choices[0].logprobs.content[0].top_logprobs,
                usage,
            )
        return output_content, None, usage

    async def __stream__(self, messages: List[Dict], model: str, **kwargs):
        managed_messages = self.ctx(messages, 110_000)
        stream = await self.client.chat.completions.create(
            model=model, messages=managed_messages, stream=True, **kwargs)
        async for chunk in stream:
            yield chunk.choices[0].delta.content or ""

    async def __function_call__(self, messages: List[Dict], model: str,
                                tools: List[Dict], **kwargs):
        system_message = list(
            filter(lambda message: message.get("role") == "system", messages))
        if len(system_message) > 0:
            system_message = system_message[0]
            system_message["content"] = (
                FUNCTION_CALLING_SYSTEM_PROMPT.format(functions=tools) +
                "\n\n" + "Task: " + "\n\n" + system_message.get("content"))

        else:
            system_message = {
                "role": "system",
                "content":
                FUNCTION_CALLING_SYSTEM_PROMPT.format(functions=tools),
            }
        non_system_messages = list(
            filter(lambda message: message.get("role") != "system", messages))
        messages = [system_message] + non_system_messages
        output_content, _, usage = await self.__complete__(messages, model)
        function_calls = extractUsingRegEx(output_content)
        if function_calls:
            return True, function_calls, usage
        print(f"OUTPUT CONTENT: ", output_content)
        return False, output_content, usage

#### Example Interaction


In [260]:
messages = [{"role": "user", "content": "I want to call my Mom"}]

In [355]:
llm = OpenAILLM(model="gpt-4o", encoding_name="o200k_base")

In [262]:
output = asyncio.run(
    llm.__function_call__(messages, "gpt-4o", available_domain_specific_tools))
print(output)

OUTPUT CONTENT:  Which app would you like to use to call your Mom, Phone or WhatsApp?
(False, 'Which app would you like to use to call your Mom, Phone or WhatsApp?', {'completion_tokens': 17, 'prompt_tokens': 1368, 'total_tokens': 1385})


In [263]:
messages.append({"role": "assistant", "content": output[1]})
messages.append({
    "role": "user",
    "content": "Contact name is 'Mom' and call her using Phone"
})
output = asyncio.run(
    llm.__function_call__(messages, "gpt-4o", available_domain_specific_tools))
print(output)

(True, [{'name': 'MakeCall', 'parameters': {'app_name': 'Phone', 'meta': {'is_number': False, 'number': None, 'contact_name': 'Mom'}}}], {'completion_tokens': 58, 'prompt_tokens': 1404, 'total_tokens': 1462})


## Mitigation Layers


### Inherent knowledge verification


In [356]:
CAN_ANSWER_WITH_INHERENT_KNOWLEDGE = """You are a helpful assistant as part of car infotainment system. The driver or passengers can ask you to perform specific tasks or can ask you about certain things.
Based on the what they are asking you have to decide if you can answer that directly or not. You just have to reply with a YES or NO and nothing else.
When a user asks for specific tasks like calling someone, playing music, setting up navigation, etc. which you cannot do directly you should reply with NO.
You only have the world knowledge up until October of 2023. You don't have any current affairs knowledge about October 2023.
Today's date is {date} (dd-mm-yyyy).
"""

In [357]:
CLASSIFICATION_THRESHOLD = 0.82
from datetime import datetime


def calculateLinearProbability(logprob):
    return np.round(np.exp(logprob.logprob) * 100)


async def canUseInherentKnowledge(llm: BaseLLM, model: str,
                                  messages: List[Dict]):
    messages = [{
        "role":
        "system",
        "content":
        CAN_ANSWER_WITH_INHERENT_KNOWLEDGE.format(
            date=datetime.today().strftime("%d-%m-%Y")),
    }] + messages
    output = await llm.__complete__(messages,
                                    model,
                                    logprobs=True,
                                    top_logprobs=1,
                                    seed=42,
                                    temperature=0.2)
    # print(output)
    output_label, logprobs, usage = output
    linear_probability = calculateLinearProbability(logprobs[0])
    if linear_probability > CLASSIFICATION_THRESHOLD:
        return output_label, usage
    return None, usage

In [266]:
print(
    asyncio.run(
        canUseInherentKnowledge(
            llm,
            "gpt-4o",
            [{
                "role":
                "user",
                "content":
                "Which team won the FIFA 2022 world cup and who was the captain?",
            }],
        )))

('YES', [TopLogprob(token='YES', bytes=[89, 69, 83], logprob=-0.00012035091)], {'completion_tokens': 1, 'prompt_tokens': 166, 'total_tokens': 167})
('YES', {'completion_tokens': 1, 'prompt_tokens': 166, 'total_tokens': 167})


In [267]:
print(
    asyncio.run(
        canUseInherentKnowledge(
            llm,
            "gpt-4o",
            [{
                "role": "user",
                "content": "Who's the current captain of Indian cricket team?",
            }],
        )))

('NO', [TopLogprob(token='NO', bytes=[78, 79], logprob=-0.004088728)], {'completion_tokens': 1, 'prompt_tokens': 159, 'total_tokens': 160})
('NO', {'completion_tokens': 1, 'prompt_tokens': 159, 'total_tokens': 160})


In [268]:
print(
    asyncio.run(
        canUseInherentKnowledge(llm, "gpt-4o", [{
            "role": "user",
            "content": "Call my mom"
        }])))

('NO', [TopLogprob(token='NO', bytes=[78, 79], logprob=-2.220075e-06)], {'completion_tokens': 1, 'prompt_tokens': 153, 'total_tokens': 154})
('NO', {'completion_tokens': 1, 'prompt_tokens': 153, 'total_tokens': 154})


### Answering with Inherent knowledge


In [358]:
ANSWER_WITH_INHERENT_KNOWLEDGE_PROMPT = """You are a helpful assistant as part of car infotainment system. You have to answer the user's generic questions or queries in the format mentioned in the function schema."""


async def answerWithInherentKnowledge(llm: BaseLLM, model: str,
                                      messages: List[Dict]):
    messages = [{
        "role": "system",
        "content": ANSWER_WITH_INHERENT_KNOWLEDGE_PROMPT
    }] + messages
    function_call_available, function_call, usage = await llm.__function_call__(
        messages, model, avaialle_non_tool_answering, seed=42, temperature=0.2)
    if function_call_available:
        return function_call, usage
    return None, usage

In [270]:
print(
    asyncio.run(
        answerWithInherentKnowledge(
            llm,
            "gpt-4o",
            [{
                "role":
                "user",
                "content":
                "Which team won the FIFA 2022 world cup and who was the captain?",
            }],
        )))

([{'name': 'Answer', 'parameters': {'answer': 'Argentina won the FIFA 2022 World Cup, and the captain was Lionel Messi.'}}], {'completion_tokens': 40, 'prompt_tokens': 319, 'total_tokens': 359})


### Search Tool Check and Search functionality


In [359]:
VERIFY_SEARCH_TOOL_NEEDED = """You are a helpful assistant as part of car infotainment system. The driver or passengers can ask you to perform specific tasks or can ask you about certain things.
Based on the what they are asking you have to decide if you can answer that directly or not. You just have to reply with a YES or NO and nothing else.
When a user asks for specific tasks like calling someone, playing music, setting up navigation, etc. which you cannot do directly you should reply with NO.
Only if the task requires searching the web you have to reply with YES.
You only have the world knowledge up until October of 2023. 
You don't have any current affairs knowledge about October 2023. You need to search for events after October 2023.
Today's date is {date} (dd-mm-yyyy)."""


async def verifySearchRequired(llm: BaseLLM, model: str, messages: List[Dict]):
    messages = [{
        "role":
        "system",
        "content":
        VERIFY_SEARCH_TOOL_NEEDED.format(
            date=datetime.today().strftime("%d-%m-%Y")),
    }] + messages
    output = await llm.__complete__(messages,
                                    model,
                                    logprobs=True,
                                    top_logprobs=1,
                                    seed=42,
                                    temperature=0.2)
    output_label, logprobs, usage = output
    # print(f"SEARCH REQUIRED LABEL: ", output_label)
    linear_probability = calculateLinearProbability(logprobs[0])
    if linear_probability > CLASSIFICATION_THRESHOLD:
        return output_label, usage
    return None, usage

In [331]:
print(
    asyncio.run(
        verifySearchRequired(
            llm,
            "gpt-4o",
            [{
                "role": "user",
                "content": "Where can I eat Chinese food near me?"
            }],
        )))

SEARCH REQUIRED LABEL:  YES
('YES', {'completion_tokens': 1, 'prompt_tokens': 187, 'total_tokens': 188})


In [332]:
print(
    asyncio.run(
        verifySearchRequired(llm, "gpt-4o", [{
            "role": "user",
            "content": "Call my mom"
        }])))

SEARCH REQUIRED LABEL:  NO
('NO', {'completion_tokens': 1, 'prompt_tokens': 181, 'total_tokens': 182})


In [333]:
print(
    asyncio.run(
        verifySearchRequired(llm, "gpt-4o", [{
            "role": "user",
            "content": "Navigate to Home"
        }])))

SEARCH REQUIRED LABEL:  NO
('NO', {'completion_tokens': 1, 'prompt_tokens': 181, 'total_tokens': 182})


In [334]:
print(
    asyncio.run(
        verifySearchRequired(llm, "gpt-4o",
                             [{
                                 "role": "user",
                                 "content": "Who won the Superbowl?"
                             }])))

SEARCH REQUIRED LABEL:  YES
('YES', {'completion_tokens': 1, 'prompt_tokens': 184, 'total_tokens': 185})


#### Brave Search


In [360]:
import httpx

os.environ["BRAVE_API_KEY"] = "YOUR BRAVE API KEY"

BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY")


async def brave_search(search_term):
    brave_api_key = BRAVE_API_KEY
    url = f"https://api.search.brave.com/res/v1/web/search?q={search_term}&count=3"
    headers = {
        "X-Subscription-Token": brave_api_key,
        "Accept": "application/json"
    }

    async with httpx.AsyncClient() as client:
        response = await client.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            # print('Search Data: ', data)
            return format_search(data)
        print(await response.text())
        return None


def format_search(search_results):
    retrieve_keys = ["web", "news"]
    formatted_results = []
    for value in retrieve_keys:
        if value in search_results:
            results = search_results[value]["results"]
            formatted_results.append("\n".join(
                f"Title: {result['title']} Description: {result['description']} URL: {result['url']}"
                for result in results))
    # print('Formatted Results: ', formatted_results)
    if formatted_results:
        return "\n".join(formatted_results)
    return None

In [361]:
SEARCH_RESPONSE_PROMPT = """You are a helpful assistant as part of car infotainment system. You are provided with a set of search results in triple backticks based on that you have to answer for the user question without being verbose in the function schema defined."""


async def searchGenAnswer(llm: BaseLLM, model: str, messages: List[Dict]):
    search_query = messages[-1].get("content")
    search_results = await brave_search(search_query)
    if not search_results:
        return False, "Unable to search! Please try again later!"
    messages[-1]["content"] += ("\n\n" + "Search results: " + "\n" +
                                f"```{search_results}```")
    messages = [{
        "role": "system",
        "content": SEARCH_RESPONSE_PROMPT
    }] + messages
    function_call_available, function_call, usage = await llm.__function_call__(
        messages, model, avaialle_non_tool_answering, seed=42, temperature=0.2)
    if function_call_available:
        return True, function_call, usage
    return False, "Unable to search! Please try again later!", usage

In [278]:
print(
    asyncio.run(
        searchGenAnswer(
            llm,
            "gpt-4o",
            [{
                "role": "user",
                "content": "Who won the superbowl in 2020?"
            }],
        )))

(True, [{'name': 'Answer', 'parameters': {'answer': 'Kansas City Chiefs'}}], {'completion_tokens': 26, 'prompt_tokens': 645, 'total_tokens': 671})


In [279]:
print(
    asyncio.run(
        searchGenAnswer(llm, "gpt-4o", [{
            "role": "user",
            "content": "Who won the UCL in 2023?"
        }])))

(True, [{'name': 'Answer', 'parameters': {'answer': 'Manchester City'}}], {'completion_tokens': 21, 'prompt_tokens': 642, 'total_tokens': 663})


In [280]:
print(
    asyncio.run(
        searchGenAnswer(llm, "gpt-4o", [{
            "role": "user",
            "content": "Who won the UCL in 2022?"
        }])))

(True, [{'name': 'Answer', 'parameters': {'answer': 'Real Madrid'}}], {'completion_tokens': 25, 'prompt_tokens': 721, 'total_tokens': 746})


### Defined Function/Tool Call based on the available actions


In [362]:
DEFINED_TOOL_CALL = """You are a helpful assistant as part of car infotainment system. The driver or passengers can ask you to perform specific tasks or can ask you about certain things.
You can take the following actions
* Interact with a call i.e. ACCEPT or REJECT a call.
* Make a call.
* Interact with Audio system i.e. Select and track to play, pause a track, or play a track.
* Interact with navigation/map system i.e. start navigation to the location or update stops on the way.
A user can even ask for multiple tasks to be done at once. The output of the task info should adhere to defined function schemas.
"""

In [363]:
async def callDefinedAction(llm: BaseLLM, model: str, messages: List[Dict]):
    messages = [{"role": "system", "content": DEFINED_TOOL_CALL}] + messages
    function_call_available, function_call, usage = await llm.__function_call__(
        messages,
        model,
        available_domain_specific_tools,
        seed=42,
        temperature=0.2)
    if function_call_available:
        return function_call, usage
    return None, usage

In [283]:
print(
    asyncio.run(
        callDefinedAction(
            llm,
            "gpt-4o",
            [
                {
                    "role": "assistant",
                    "content": "You are getting a call from +918878900",
                },
                {
                    "role":
                    "user",
                    "content":
                    "Reject the call and send message that I'm driving cannot talk right now will catchup later.",
                },
            ],
        )))

([{'name': 'CallInteraction', 'parameters': {'interaction_type': 'REJECT'}}, {'name': 'SendMessage', 'parameters': {'app_name': 'Phone', 'meta': {'is_number': True, 'number': '+918878900', 'contact_name': None}, 'message_text': "I'm driving cannot talk right now will catchup later."}}], {'completion_tokens': 101, 'prompt_tokens': 1530, 'total_tokens': 1631})


### Completion with Mitigation


In [364]:
from rich.console import Console
from rich.theme import Theme

custom_theme = Theme({
    "info": "dim cyan",
    "warning": "magenta",
    "error": "bold red",
    "prompt": "dim cyan",
    "user_input": "bold green",
    "assistant": "bold blue",
})
console = Console(theme=custom_theme)

In [365]:
from time import time


async def mitigateAndComplete(llm: BaseLLM, model: str, messages: List[Dict]):
    st_time = time()
    answer = -1
    token_usage = {
        "completion_tokens": 0,
        "prompt_tokens": 0,
        "total_tokens": 0
    }

    def updateUsage(usage):
        for k, v in usage.items():
            token_usage[k] += v

    # console.print("User Input: ", messages[-1].get("content"), style="user_input")
    reply_with_inherent_knowledge, usage = await canUseInherentKnowledge(
        llm, model, messages)
    updateUsage(usage)
    if reply_with_inherent_knowledge == "YES":
        # console.print("Replying with Inherent Knowledge", style="info")
        answer, usage = await answerWithInherentKnowledge(llm, model, messages)
        updateUsage(usage)
        answer = answer
        # if answer:
        #     return answer
        # return -1
    elif reply_with_inherent_knowledge == "NO":
        need_search, usage = await verifySearchRequired(llm, model, messages)
        updateUsage(usage)
        if need_search == "YES":
            # console.print("Search to reply", style="info")
            search_status, answer, usage = await searchGenAnswer(
                llm, model, messages)
            updateUsage(usage)
            if search_status:
                # return answer
                answer = answer
            # else:
            #     # console.print("Unable to search", style="info")
            #     # return -1
        elif need_search == "NO":
            # console.print("Calling defined Action", style="info")
            defined_action, usage = await callDefinedAction(
                llm, model, messages)
            updateUsage(usage)
            answer = defined_action if defined_action else -1
            # return defined_action if defined_action else -1
        # else:
        #     return -1
    # else:
    #     return -1
    latency = time() - st_time
    return answer, token_usage, f"{latency:.4f}"

In [288]:
console.print(
    asyncio.run(
        mitigateAndComplete(
            llm,
            "gpt-4o",
            [{
                "role":
                "user",
                "content":
                "Who was the captain of the Indian cricket team in 2018?",
            }],
        )),
    style="assistant",
)

('YES', [TopLogprob(token='YES', bytes=[89, 69, 83], logprob=-7.107425e-06)], {'completion_tokens': 1, 'prompt_tokens': 164, 'total_tokens': 165})


In [289]:
console.print(
    asyncio.run(
        mitigateAndComplete(
            llm,
            "gpt-4o",
            [{
                "role":
                "user",
                "content":
                "Send a message to contact Mom via WhatsApp that I won't be able to make for dinner",
            }],
        )),
    style="assistant",
)

('NO', [TopLogprob(token='NO', bytes=[78, 79], logprob=-2.1008714e-06)], {'completion_tokens': 1, 'prompt_tokens': 168, 'total_tokens': 169})
SEARCH REQUIRED LABEL:  NO


In [290]:
console.print(
    asyncio.run(
        mitigateAndComplete(llm, "gpt-4o",
                            [{
                                "role": "user",
                                "content": "Who won the UCL in 2019?"
                            }])),
    style="assistant",
)

('YES', [TopLogprob(token='YES', bytes=[89, 69, 83], logprob=-8.89548e-06)], {'completion_tokens': 1, 'prompt_tokens': 160, 'total_tokens': 161})


In [291]:
console.print(
    asyncio.run(
        mitigateAndComplete(llm, "gpt-4o",
                            [{
                                "role": "user",
                                "content": "Who won the UCL in 2022?"
                            }])),
    style="assistant",
)

('YES', [TopLogprob(token='YES', bytes=[89, 69, 83], logprob=-0.00010175513)], {'completion_tokens': 1, 'prompt_tokens': 160, 'total_tokens': 161})


In [292]:
console.print(
    asyncio.run(
        mitigateAndComplete(
            llm,
            "gpt-4o",
            [
                {
                    "role": "assistant",
                    "content": "You have a call from contact name 'John Doe'",
                },
                {
                    "role":
                    "user",
                    "content":
                    "I'm driving I cannot talk right now reject it. Send him a message saying that",
                },
            ],
        )),
    style="assistant",
)

('NO', [TopLogprob(token='NO', bytes=[78, 79], logprob=-3.392825e-05)], {'completion_tokens': 1, 'prompt_tokens': 181, 'total_tokens': 182})
SEARCH REQUIRED LABEL:  NO


## Validation

As we're already mitigating the possibility of hallucination in our completion only we just need to verify of the output we've received is part of the available function calls and adheres to the schema of that call.


In [198]:
CallInteraction.model_validate({
    "name": "CallInteraction",
    "parameters": {
        "interaction_type": "REJECT"
    }
}["parameters"])

CallInteraction(interaction_type='REJECT')

In [366]:
from pydantic import ValidationError

In [367]:
def verifyFunctionNameAndSchema(function_call_output):
    name2validation = {
        "CallInteraction":
        lambda fco: CallInteraction.model_validate(fco.get("parameters")),
        "Answer":
        lambda fco: NonToolAnswer.model_validate(fco.get("parameters")),
        "MakeCall":
        lambda fco: MakeCall.model_validate(fco.get("parameters")),
        "SendMessage":
        lambda fco: SendMessage.model_validate(fco.get("parameters")),
        "AudioInteraction":
        lambda fco: AudioInteraction.model_validate(fco.get("parameters")),
        "MapInteraction":
        lambda fco: MapInteraction.model_validate(fco.get("parameters")),
    }
    if function_call_output.get("name") in name2validation:
        try:
            name2validation[function_call_output.get("name")](
                function_call_output)
            return True, ""
        except ValidationError as err:
            return False, str(err)
    else:
        return False, "Function Call Not Available!"

## Mitigate + Complete + Validate


In [368]:
async def mitigateCompleteValidate(llm: BaseLLM, model: str,
                                   messages: List[Dict]):
    output = -1
    function_calls, token_usage, latency = await mitigateAndComplete(
        llm, model, messages)
    vst = time()
    # if not function_calls or function_calls == -1:
    #     return -1
    if all([
            verifyFunctionNameAndSchema(function_call)[0]
            for function_call in function_calls
    ]):
        # return function_calls
        output = function_calls
    # else:
    #     return -1
    latency = f"{(time() - vst + float(latency)):.4f}"
    return output, token_usage, latency

In [296]:
console.print(
    asyncio.run(
        mitigateCompleteValidate(
            llm,
            "gpt-4o",
            [
                {
                    "role": "assistant",
                    "content": "You have a call from contact name 'John Doe'",
                },
                {
                    "role":
                    "user",
                    "content":
                    "I'm driving I cannot talk right now reject it. Send him a message saying that",
                },
            ],
        )),
    style="assistant",
)

('NO', [TopLogprob(token='NO', bytes=[78, 79], logprob=-8.89548e-06)], {'completion_tokens': 1, 'prompt_tokens': 181, 'total_tokens': 182})
SEARCH REQUIRED LABEL:  NO


In [369]:
console.print(
    asyncio.run(
        mitigateCompleteValidate(llm, "gpt-4o",
                                 [{
                                     "role": "user",
                                     "content": "Who won the UCL in 2022?"
                                 }])),
    style="assistant",
)

In [299]:
console.print(
    asyncio.run(
        mitigateCompleteValidate(llm, "gpt-4o",
                                 [{
                                     "role": "user",
                                     "content": "Who won the UCL in 2023?"
                                 }])),
    style="assistant",
)

('NO', [TopLogprob(token='NO', bytes=[78, 79], logprob=-0.28119066)], {'completion_tokens': 1, 'prompt_tokens': 160, 'total_tokens': 161})
SEARCH REQUIRED LABEL:  YES


In [300]:
console.print(
    asyncio.run(
        mitigateCompleteValidate(llm, "gpt-4o",
                                 [{
                                     "role": "user",
                                     "content": "Who won the UCL in 2024?"
                                 }])),
    style="assistant",
)

('NO', [TopLogprob(token='NO', bytes=[78, 79], logprob=-6.869018e-06)], {'completion_tokens': 1, 'prompt_tokens': 160, 'total_tokens': 161})
SEARCH REQUIRED LABEL:  YES


In [301]:
console.print(
    asyncio.run(
        mitigateCompleteValidate(
            llm,
            "gpt-4o",
            [{
                "role": "user",
                "content": "Navigate to Church Street, Bangalore"
            }],
        )),
    style="assistant",
)

('NO', [TopLogprob(token='NO', bytes=[78, 79], logprob=-4.4537377e-05)], {'completion_tokens': 1, 'prompt_tokens': 156, 'total_tokens': 157})
SEARCH REQUIRED LABEL:  NO


## LLM-as-a-Judge

We're validating the function call against its availability and the schema generated for the function call against what we've defined for it.

Now for a set of multi-turn instruction let's use the **LLM-as-a-Judge** approach to verify the quality of the response on a scale of 0 to 4.

We'll use the `gpt-4-turbo` model as the Judge.


In [370]:
judge_llm = OpenAILLM(model="gpt-4-turbo")

In [371]:
## judge output schema


class JudgeOutput(BaseModel):
    evaluation: str = Field(
        ...,
        description=
        "Your rationale for the rating, explaining how well the function call aligns with the user's request",
    )
    rating: int = Field(
        ...,
        description="Your rating, as a number between 1 and 4.",
        ge=1,
        le=4)


judge_functions = [{
    "name": "JudgeSystemGeneratedResponse",
    "description":
    "Provide your score for the system generated output in context of the user messages",
    "parameters": JudgeOutput.model_json_schema(),
}]

In [379]:
JUDGE_PROMPT = """You will be provided with a system generated response for a set of user messages in the form of function calls that the system proposes to handle the user's request.
Your task is to evaluate how effectively the system generated response meets the user's needs as expressed by the user.

The responses could be functions calls for various actions such as 'MakeCall', 'CallInteraction', 'SendMessage', 'AudioInteraction', 'MapInteraction', and 'Answer'.
Each function call will have parameters detailing the action to be taken.

Here is the scale you should use to build your answer:
1: The response is completely inappropriate: it does not address the user's request at all or is grossly incorrect.
2: The response is partially appropriate: it addresses some aspects of the user's request but misses other important aspects.
3: The response is mostly appropriate: it adequately addresses the user's request but could include more precise parameters or additional relevant actions.
4: The response is excellent: it perfectly matches the user's request and includes all necessary details and parameters.

Apart from scoring you also need to provide you rationale behind the scoring as mentioned in the function schema.
Always output between the <functioncall></functioncall> block.
"""

In [380]:
async def judgeResponse(
    llm: BaseLLM,
    model: str,
    messages: List[Dict],
    system_generated_response: List[Dict],
):
    st_time = time()
    messages = ([{
        "role": "system",
        "content": JUDGE_PROMPT
    }] + messages + [{
        "role":
        "assistant",
        "content":
        f"System Generated Response: '{system_generated_response}'",
    }])
    fc_available, function_call, usage = await llm.__function_call__(
        messages, model, judge_functions)
    if not fc_available:
        return {"rating": 0, "evaluation": "Unable to rank!"}
    ranking_parameters = function_call[0].get("parameters")
    latency = time() - st_time
    return ranking_parameters, usage, f"{latency:.4f}"

In [381]:
messages = [
    {
        "role": "system",
        "content": JUDGE_PROMPT
    },
    {
        "role": "user",
        "content": "Navigate to Church Street, Bangalore"
    },
]
messages += [{
    "role":
    "assistant",
    "content":
    """System Generated Response: [{'name': 'MapInteraction', 'parameters': {'action': 'Start', 'is_update': False, 'stop': None}}]""",
}]
output = asyncio.run(
    judge_llm.__function_call__(messages, "gpt-4-turbo", judge_functions))
console.print(output)

In [308]:
messages = [
    {
        "role": "system",
        "content": JUDGE_PROMPT
    },
    {
        "role": "user",
        "content": "Navigate to Church Street, Bangalore"
    },
]
messages += [{
    "role":
    "assistant",
    "content":
    """System Generated Response: [{'name': 'MapInteraction', 'parameters': {'action': 'Start', 'is_update': False, 'stop': None}}]""",
}]
output = asyncio.run(
    judge_llm.__function_call__(messages, "gpt-4-turbo", judge_functions))
console.print(output)

In [374]:
op = asyncio.run(
    judgeResponse(
        judge_llm,
        "gpt-4-turbo",
        [
            {
                "role": "assistant",
                "content": "You have a call from contact name 'John Doe'",
            },
            {
                "role":
                "user",
                "content":
                "I'm driving I cannot talk right now reject it. Send him a message saying that",
            },
        ],
        """[
    {'name': 'CallInteraction', 'parameters': {'interaction_type': 'REJECT'}},
    {
        'name': 'SendMessage',
        'parameters': {
            'app_name': 'Phone',
            'meta': {'is_number': False, 'number': None, 'contact_name': 'John Doe'},
            'message_text': "I'm driving I cannot talk right now"
        }
    }
]""",
    ))
console.print(op, style="prompt")

# Evaluation with LLMJudge

There are $14$ multi-turn conversations available in `test_dataset/conversations.json` file. We'll be testing the system response returned by the `mitigateCompleteValidate` function using the `judgeResponse` function.


In [382]:
import json

conversations = json.loads(open("../test_dataset/conversations.json").read())

In [383]:
len(conversations)

14

In [384]:
from tqdm.auto import tqdm


async def evaluate(conversations):
    ratings = []
    mitigate_complete_validate_token_usage = []
    mitigate_complete_validate_latency = 0
    rating_token_usage = []
    rating_latency = 0
    outputs = []
    for _, conversation in enumerate(tqdm(conversations)):
        try:
            output, token_usage, latency = await mitigateCompleteValidate(
                llm, "gpt-4o", conversation)
            outputs.append(output)
            mitigate_complete_validate_latency += float(latency)
            mitigate_complete_validate_token_usage.append(token_usage)
            if not output == -1:
                rating, usage, latency = await judgeResponse(
                    judge_llm, "gpt-4-turbo", conversation, output)
                rating_latency += float(latency)
                rating_token_usage.append(usage)
                ratings.append(rating)
            else:
                ratings.append({"ranking": 0, "evaluation": ""})
        except Exception as err:
            pass
    return (
        ratings,
        mitigate_complete_validate_token_usage,
        mitigate_complete_validate_latency,
        rating_token_usage,
        rating_latency,
        outputs,
    )

In [385]:
evaluation_output = asyncio.run(evaluate(conversations))

  0%|          | 0/14 [00:00<?, ?it/s]

In [387]:
(
    ratings,
    mitigate_complete_validate_token_usage,
    mitigate_complete_validate_latency,
    rating_token_usage,
    rating_latency,
    outputs,
) = evaluation_output

In [394]:
combined_report = []
for ix in range(len(ratings)):
    combined_report.append({
        "input":
        conversations[ix],
        "system_response":
        outputs[ix],
        "rating":
        ratings[ix],
        "token_usage":
        mitigate_complete_validate_token_usage[ix],
    })
average_rating = sum([rating.get("rating")
                      for rating in ratings]) / len(ratings)
combined_report = {
    "report":
    combined_report,
    "average_rating":
    average_rating,
    "total_time_for_generating_system_responses":
    mitigate_complete_validate_latency,
    "average_time_for_generating_system_responses":
    mitigate_complete_validate_latency / len(outputs),
    "total_time_for_evaluating":
    rating_latency,
    "average_rating_time":
    rating_latency / len(ratings),
}

In [395]:
with open("./evaluation_report.json", "w") as fp:
    json.dump(combined_report, fp, indent=4)