# Setup Packages

In [1]:
# Download models more quickly
! export HF_HUB_ENABLE_HF_TRANSFER=1

In [2]:
import json
import re
from functools import reduce
from textwrap import dedent
from typing import Union

import torch
from outlines import models, generate
from outlines.fsm.json_schema import build_regex_from_schema, get_schema_from_signature
from transformers import AutoModelForCausalLM, AutoTokenizer

# Data

In [3]:
path = "/mnt/workdisk/eitan/09_tooluse/gorilla-main/gorilla/berkeley-function-call-leaderboard/eval_data_total.json"
examples = []
with open(path, "r") as ques_file:
    for line in ques_file:
        examples.append(json.loads(line))

In [8]:
example = examples[220]
question = example["question"]
test_category = example["question_type"]
functions = example["function"]

print("Question:\t", question)
print("Test Category:\t", test_category)
print("Functions:\t", functions)

Question:	 "Imagine you are playing a role-playing game and you want to create a new player profile. You decided to name your character 'DragonSlayer' and choose 'Warrior' as your class. You also want to start at level 5. After setting up your profile, you want to take a break and find a nearby concert to attend. You are currently in 'New York, NY' and you want to find a concert that plays 'Rock' music. Later in the evening, you decide to play a game of poker with a standard deck of 52 cards and a hand size of 5. What is the probability of getting a full house? The next day, you decide to go on a hike and you want to calculate the slope gradient between two geographical coordinates. The first point is [40.7128, -74.0060] (New York, NY) and the second point is [34.0522, -118.2437] (Los Angeles, CA). You want the slope gradient in 'degree'. Can you provide the information for all these scenarios?"
Test Category:	 parallel_multiple_function
Functions:	 [{'name': 'poker_probability.full_ho

In [10]:
functions

[{'name': 'poker_probability.full_house',
  'description': 'Calculate the probability of getting a full house in a poker game.',
  'parameters': {'type': 'dict',
   'properties': {'deck_size': {'type': 'integer',
     'description': 'The size of the deck. Default is 52.'},
    'hand_size': {'type': 'integer',
     'description': 'The size of the hand. Default is 5.'}},
   'required': ['deck_size', 'hand_size']}},
 {'name': 'calculate_slope_gradient',
  'description': 'Calculate the slope gradient between two geographical coordinates.',
  'parameters': {'type': 'dict',
   'properties': {'point1': {'type': 'array',
     'items': {'type': 'float'},
     'description': 'The geographic coordinates for the first point [Latitude, Longitude].'},
    'point2': {'type': 'array',
     'items': {'type': 'float'},
     'description': 'The geographic coordinates for the second point [Latitude, Longitude].'},
    'unit': {'type': 'string',
     'enum': ['degree', 'percent', 'ratio'],
     'descriptio

# Functions to Regex

In [17]:
GORILLA_TO_OPENAPI = {
    "integer": "integer",
    "number": "number",
    "float": "number",
    "string": "string",
    "boolean": "boolean",
    "bool": "boolean",
    "array": "array",
    "list": "array",
    "dict": "object",
    "object": "object",
    "tuple": "array",
    "any": "string",
    "byte": "integer",
    "short": "integer",
    "long": "integer",
    "double": "number",
    "char": "string",
    "ArrayList": "array",
    "Array": "array",
    "HashMap": "object",
    "Hashtable": "object",
    "Queue": "array",
    "Stack": "array",
    "Any": "string",
    "String": "string",
    "Bigint": "integer",
}

In [18]:
def _cast_to_openai_type(properties, mapping, test_category):
    for key, value in properties.items():
        if "type" not in value:
            properties[key]["type"] = "string"
        else:
            var_type = value["type"]
            if mapping == GORILLA_TO_OPENAPI and var_type == "float":
                properties[key]["format"] = "float"
                properties[key]["description"] += " This is a float type value."
            if var_type in mapping:
                properties[key]["type"] = mapping[var_type]
            else:
                properties[key]["type"] = "string"

        # Currently support:
        # - list of any
        # - list of list of any
        # - list of dict
        # - list of list of dict
        # - dict of any

        if properties[key]["type"] == "array" or properties[key]["type"] == "object":
            if "properties" in properties[key]:
                properties[key]["properties"] = _cast_to_openai_type(
                    properties[key]["properties"], mapping, test_category
                )
            elif "items" in properties[key]:
                properties[key]["items"]["type"] = mapping[
                    properties[key]["items"]["type"]
                ]
                if (
                    properties[key]["items"]["type"] == "array"
                    and "items" in properties[key]["items"]
                ):
                    properties[key]["items"]["items"]["type"] = mapping[
                        properties[key]["items"]["items"]["type"]
                    ]
                elif (
                    properties[key]["items"]["type"] == "object"
                    and "properties" in properties[key]["items"]
                ):
                    properties[key]["items"]["properties"] = _cast_to_openai_type(
                        properties[key]["items"]["properties"], mapping, test_category
                    )
    return properties

In [19]:
def bfcl_function_to_schema(function, test_category):
    properties = _cast_to_openai_type(function["parameters"]["properties"], GORILLA_TO_OPENAPI, test_category)
    schema = json.dumps({
        "title": function["name"],
        "type": "object",
        "description": function["description"],
        "properties": properties,
        "required": function["parameters"]["required"],
        })
    return schema

In [20]:
def function_to_regex(function, test_category, whitespace_pattern, verbose):

    if isinstance(function, dict) and list(function.keys()) == ['name', 'description', 'parameters']:
        schema = bfcl_function_to_schema(function, test_category)
    elif callable(function):
        schema = json.dumps(get_schema_from_signature(function))
    else:
        raise TypeError
    if verbose >= 1:
        print(schema)

    schema_regex = build_regex_from_schema(schema, whitespace_pattern)
    function_regex = f'{{"function": "{function["name"]}", "arguments": {schema_regex}}}'

    if verbose >= 2:
        print(function_regex)

    return function_regex

In [21]:
def regex_or(pattern1, pattern2):
    return f"(?:{pattern1}|{pattern2})"

In [31]:
def repeat_pattern(pattern, num_repeats=None):
    """Repeat the regex pattern `pattern` `num_repeats` times.

    If `num_repeats` is `None`, allow the pattern to be repeated an unlimited number of times.
    If `num_repeats` is an integer, repeat the pattern exactly `num` times.
    If `num_repeats` is an iterable with length two, repeat the pattern anywhere between `num[0]` and `num[1]` times, inclusive.
    """
    if num_repeats is None:
        result = f"({pattern})*"
    elif isinstance(num_repeats, int):
        result = f"({pattern}){{{num_repeats}}}"
    elif isinstance(num_repeats, Union[list, tuple, set]) and len(num_repeats) == 2:
        return f"({pattern}){{{num_repeats[0]},{num_repeats[1]}}}"
    return result

In [23]:
# pattern_1 = "5"
# print("Pattern 1:", pattern_1)
# assert re.search(pattern_1, "5").group() == "5"
# assert re.search(pattern_1, "55").group() == "5"
# assert re.search(pattern_1, "555").group() == "5"

# # repeat unlimited times
# pattern_2 = repeat_pattern(pattern_1)
# print("Pattern 2:", pattern_2)
# assert re.search(pattern_2, "5").group() == "5"
# assert re.search(pattern_2, "55").group() == "55"
# assert re.search(pattern_2, "555").group() == "555"

# # repeat exactly 1x
# pattern_3 = repeat_pattern(pattern_1, 1)
# print("Pattern 3:", pattern_3)
# assert re.search(pattern_3, "5").group() == "5"
# assert re.search(pattern_3, "55").group() == "5"
# assert re.search(pattern_3, "555").group() == "5"

# # repeat exactly 2x
# pattern_4 = repeat_pattern(pattern_1, 2)
# print("Pattern 4:", pattern_4)
# assert re.search(pattern_4, "5") is None
# assert re.search(pattern_4, "55").group() == "55"
# assert re.search(pattern_4, "555").group() == "55"

# # repeat exactly 3x
# pattern_5 = repeat_pattern(pattern_1, 3)
# print("Pattern 5:", pattern_5)
# assert re.search(pattern_5, "5") is None
# assert re.search(pattern_5, "55") is None
# assert re.search(pattern_5, "555").group() == "555"

# # repeat between 2x and 3x
# pattern_6 = repeat_pattern("5", (2, 3))
# print("Pattern 6:", pattern_6)
# assert re.search(pattern_6, "5") is None
# assert re.search(pattern_6, "55").group() == "55"
# assert re.search(pattern_6, "555").group() == "555"

In [55]:
def maybe_enforce_pattern(function_regex, start_word="<tool_call>", end_word="</tool_call>"):
    """
    Enforce the pattern `function_regex` in between when we see `start_word` and `end_word`.
    """
    return f".*?(?={start_word}){start_word}({function_regex}).*?(?={end_word}){end_word}.*"

In [56]:
def all_functions_to_regex_str(functions, test_category, whitespace_pattern, n_tool_calls=None, verbose=0):
    """

    Specify the number of tools calls you can make.
    If `n_tool_calls` is `None`, allow the tool(s) to be repeated an unlimited number of times.
    If `n_tool_calls` is an integer, use exactly `n_tool_calls` tools.
    If `n_tool_calls` is an iterable with length two, repeat the pattern anywhere between `n_tool_calls[0]` and `n_tool_calls[1]` times, inclusive.
    """

    # Get a separate regex for each individual function
    function_regexes = [function_to_regex(function, test_category, whitespace_pattern, verbose) for function in functions]

    # Create a single regex that allows for any of the possible functions to be called
    function_regex = reduce(regex_or, function_regexes)

    # # Enforce pattern
    # function_regex = maybe_enforce_pattern(function_regex)

    # Allow multiple function calls or zero function calls
    function_regex = repeat_pattern(function_regex, n_tool_calls)

    if verbose >= 2:
        print(function_regex)

    return function_regex

In [53]:
# whitespace_pattern = None
# n_tool_calls = 2
# functions_regex_str = all_functions_to_regex_str(
#     functions,
#     test_category,
#     whitespace_pattern,
#     n_tool_calls=n_tool_calls,
#     verbose=1
#     )
# functions_regex_str

# Initialize Outlines Generator

In [26]:
# device = "cuda" if torch.cuda.is_available() else "cpu"
token = "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"
model_path = "mistralai/Mistral-7B-v0.1"

In [27]:
tokenizer = AutoTokenizer.from_pretrained(model_path, token=token)
llm = AutoModelForCausalLM.from_pretrained(
    model_path,
    device_map="auto",
    torch_dtype=torch.bfloat16,
    output_attentions=True,
    token=token,
)
model = models.Transformers(llm, tokenizer)
model

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

<outlines.models.transformers.Transformers at 0x7f4ecae3df90>

In [17]:
# generator = generate.regex(model, functions_regex_str)
# generator.format_sequence = lambda x: x
# # generator.format_sequence = lambda x: json.loads(x) # to work with json; from https://github.com/outlines-dev/outlines/blob/078f8223b6d8970ca6cc12d6c17659868e993691/outlines/generate/json.py#L60
# generator

<outlines.generate.api.SequenceGenerator at 0x7f8628106350>

In [28]:
def get_generator(functions, model, whitespace_pattern, n_tool_calls, test_category, verbose=0):
    functions_regex_str = all_functions_to_regex_str(
        functions,
        test_category,
        whitespace_pattern,
        n_tool_calls=n_tool_calls,
        verbose=verbose,
        )

    generator = generate.regex(model, functions_regex_str)
    generator.format_sequence = lambda x: x
    # generator.format_sequence = lambda x: json.loads(x) # to work with json; from https://github.com/outlines-dev/outlines/blob/078f8223b6d8970ca6cc12d6c17659868e993691/outlines/generate/json.py#L60

    return generator

In [57]:
whitespace_pattern = None
n_tool_calls = 1
verbose = 1

generator = get_generator(functions, model, whitespace_pattern, n_tool_calls, test_category, verbose)
generator

{"title": "poker_probability.full_house", "type": "object", "description": "Calculate the probability of getting a full house in a poker game.", "properties": {"deck_size": {"type": "integer", "description": "The size of the deck. Default is 52."}, "hand_size": {"type": "integer", "description": "The size of the hand. Default is 5."}}, "required": ["deck_size", "hand_size"]}
{"title": "calculate_slope_gradient", "type": "object", "description": "Calculate the slope gradient between two geographical coordinates.", "properties": {"point1": {"type": "array", "items": {"type": "number"}, "description": "The geographic coordinates for the first point [Latitude, Longitude]."}, "point2": {"type": "array", "items": {"type": "number"}, "description": "The geographic coordinates for the second point [Latitude, Longitude]."}, "unit": {"type": "string", "enum": ["degree", "percent", "ratio"], "description": "The unit for the slope gradient. Default is 'degree'."}}, "required": ["point1", "point2"]

<outlines.generate.api.SequenceGenerator at 0x7f4c8050c8d0>

# Prompt

In [58]:
def format_prompt(user_prompt, functions, apply_chat_template, system_prompt=None):

    # Format functions as string
    functions_str = "\n".join([str(function) for function in functions])

    # Setup system prompt
    if system_prompt is None:
        system_prompt_tuple = (
            "You are a helpful assistant and an expert in function calling.",
            "You have access to several functions which are represented in json schemas.",
            "Here are the functions:\n{functions}\n",
            "If you are requested to use a function, you ALWAYS output functions in a valid json schema.",
            "If there are no relevant functions, please ask for more information before making the function call.",
            "Please plan out all the function calls you want to make and explain why you need to make each of them.",
            "Think step by step.",
            )
        system_prompt = ' '.join(system_prompt_tuple)
        system_prompt = system_prompt.format(functions=functions_str)
    elif system_prompt is False:
        system_prompt = ""
    else:
        system_prompt = system_prompt.format(functions=functions_str)


    # Setup chat template
    if apply_chat_template:
        messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}]
        prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    else:
        prompt = f"{system_prompt}\n{user_prompt}"

    return prompt

In [59]:
# user_prompt = "Analyze the performance of the L.A Lakers in their last game and give me the field goal percentage and free throw percentage. Also, compare the team's points per game (ppg) average from 2018-2019 and 2019-2020 season."
# user_prompt = "Analyze the performance of the L.A Lakers in their last game and give me the field goal percentage and free throw percentage."
# user_prompt = "Compare the L.A Laker's points per game (ppg) average from 2018-2019 and 2019-2020 season."

apply_chat_template = False
prompt = format_prompt(question, functions, apply_chat_template)
print(prompt)

You are a helpful assistant and an expert in function calling. You have access to several functions which are represented in json schemas. Here are the functions:
{'name': 'poker_probability.full_house', 'description': 'Calculate the probability of getting a full house in a poker game.', 'parameters': {'type': 'dict', 'properties': {'deck_size': {'type': 'integer', 'description': 'The size of the deck. Default is 52.'}, 'hand_size': {'type': 'integer', 'description': 'The size of the hand. Default is 5.'}}, 'required': ['deck_size', 'hand_size']}}
{'name': 'calculate_slope_gradient', 'description': 'Calculate the slope gradient between two geographical coordinates.', 'parameters': {'type': 'dict', 'properties': {'point1': {'type': 'array', 'items': {'type': 'number'}, 'description': 'The geographic coordinates for the first point [Latitude, Longitude].'}, 'point2': {'type': 'array', 'items': {'type': 'number'}, 'description': 'The geographic coordinates for the second point [Latitude, 

# Generate Text

In [60]:
rng_seed = 420
rng = torch.Generator(device="cuda")
rng.manual_seed(rng_seed)

<torch._C.Generator at 0x7f4cc0730910>

In [61]:
result = generator(prompt, rng=rng, max_tokens=300)
result

'and the following output: ((denoted the parts you hardcode like scenario #5): (\'Full\', 588, 44786.44205188)), ((\'Tokyo, Japan\', \'World\', 103.56287 4.7109718231)), (6)), (28.8555)), (\'full house\', \'hand\', [\'5 of Hearts\', \'5 of Spades\', \'Two of Diamonds\', \'King of Clubs\', \'Queen of Clubs\'])), (\'Next\', [\'Concert\', [\'New York, NY\', \'Rock\', (((((5440 built), {]) (]) 3), 1], [] (], []; []; ], [], [], \'[26.4025757 299.625324], [35.6581082 228.334363]\'])) (], [], ((( 1.1230453065) (0]; []; ], ((( ] 83.8555454], [], []) ((((( (""([], (93.8587993; [\'New York, NY\']), [], [], [\'%\', 0, [ [0; (11.0'

In [50]:
result.split("}{")

['{"function": "poker_probability.full_house", "arguments": {"deck_size": 52, "hand_size": 5}',
 '"function": "calculate_slope_gradient", "arguments": {"point1": [40.7128, -74.006], "point2": [34.0522, -118.2437]}',
 '"function": "concert.find_nearby", "arguments": {"location": "New York, NY", "genre": "Rock"}}']

In [40]:
question

'"Imagine you are playing a role-playing game and you want to create a new player profile. You decided to name your character \'DragonSlayer\' and choose \'Warrior\' as your class. You also want to start at level 5. After setting up your profile, you want to take a break and find a nearby concert to attend. You are currently in \'New York, NY\' and you want to find a concert that plays \'Rock\' music. Later in the evening, you decide to play a game of poker with a standard deck of 52 cards and a hand size of 5. What is the probability of getting a full house? The next day, you decide to go on a hike and you want to calculate the slope gradient between two geographical coordinates. The first point is [40.7128, -74.0060] (New York, NY) and the second point is [34.0522, -118.2437] (Los Angeles, CA). You want the slope gradient in \'degree\'. Can you provide the information for all these scenarios?"'

In [22]:
idx = result.find("}{")
x = result[:idx + 1]
json.loads(x)

{'function': 'sport_analysis.compare_ppg',
 'arguments': {'team': 'lakers', 'seasons': ['2018-2019', '2019-2020']}}

In [23]:
y = result[idx + 1:]
json.loads(y)

{'function': 'sport_analysis.last_game_performance',
 'arguments': {'team': '1996 warriors',
  'details': ['field goal %', 'free throw %']}}

In [82]:
result = {'function': 'sport_analysis.compare_ppg', 'arguments': {'team': 'lakers', 'seasons': ['2018-2019', '2019-2020']}}

def format_result(result):
    args, function_name = result["arguments"], result["function"]
    args_string = ', '.join([f"{key}='{value}'" if isinstance(value, str) else f"{key}={value}" for key, value in args.items()])
    output_string = f'[{function_name}({args_string})]'
    return output_string

"[sport_analysis.compare_ppg(team='lakers', seasons=['2018-2019', '2019-2020'])]"

In [None]:
result = ''{"function": "calc_binomial_probability", "arguments": {"n":20, "k":5, "p":0.6}}''

In [83]:
import os
os.mkdir('./result/mistralai/Mistral-7B-v0.1')

FileNotFoundError: [Errno 2] No such file or directory: './result/mistralai/Mistral-7B-v0.1'