In [None]:
import guidance
from guidance import models, gen, one_or_more, select, zero_or_more, regex, optional, capture

model_name = "your_model_name_here"

model = models.LlamaCpp(f"{model_name}.gguf", n_gpu_layers=-1, n_ctx=2048)

In [None]:
@guidance(stateless=False)
def drinkOrderCoffee(lm):
    lm += select(["number="+regex("\d+"), ""], name='numberFlag')
    if drink_type_values:
      lm += select([", drink_type='" + select(drink_type_values, name='drinkTypeName')+"'", ""], name='drinkTypeFlag')

    if roast_type_values:
      lm += select([", roast_type='"+select(roast_type_values, name='roastTypeName')+"'", ""], name='drinkTypeFlag')

    if size_values:
      lm += select([", size='"+select(size_values, name='sizeName')+"'", ""], name='sizeFlag')

    if style_values:
      lm += select([", style='"+select(style_values, name='styleName')+"'", ""], name='styleFlag')

    if topping_values:
      lm += select([", toppings=[", ""], name='toppingsFlag')
      if lm['toppingsFlag']:
        for i in range(min(len(topping_values), 3)):
          lm += toppingCoffee()
          if not topping_values:
            lm += ']'
            break
          lm += select([", ", "]"], name="finishedListToppings")
          if lm['finishedListToppings'] == "]":
            break



    return lm + ")"

@guidance(stateless=False)
def toppingCoffee(lm):
  lm += "Topping(name="
  if topping_values:
    lm += "'" + select(topping_values, name='toppingName') + "'"

  if quantity_values:
    lm += select([", qualifier='" + select(quantity_values, name='qualifierName') + "'", ""], name='qualifierFlag')

  if not_values:
    lm += select([", negation=True", ""], name='negationFlag')

  lm += ")"
  return lm

@guidance(stateless=False)
def validOrderCoffee(lm):
  lm += "["
  first = True
  for i in range(3):
    if drink_type_values:
      if not first:
        lm += select(", ", "")
      else:
        first = False
      lm += select(["DrinkOrder(", ']'], name='choice')
      if lm['choice'] != ']':
        lm += drinkOrderCoffee()
      else:
        return lm
    else:
      break
  return lm +']'

In [None]:
instruction_generate_coffee = """You are a helpful assistant. You have to take as input a customer order and output a list of the corresponding objects. You should use only the following classes in Python:
class Topping:
      def __init__(self, name: str, qualifier: Optional[str] = None, negation: Optional[bool] = False) -> None:

class DrinkOrder:
      def __init__(self, number: int = 1, drink_type: Optional[str] = None, size: Optional[str] = None, style: Optional[str] = None, roast_type: Optional[str] = None, toppings: Optional[List[Topping]] = None) -> None:

The output should be a list of those objects."""


In [None]:
import json
import spacy
from spacy.pipeline import EntityRuler
from spacy import displacy
import os
import re

def parse_line(line):
    parts = line.strip().split('\t')
    if len(parts) != 2:
        return None, None

    phrase, category = parts

    return phrase, category.strip()

def init_pipeline(dataset = "coffee"):
  nlp = spacy.load("en_core_web_sm")
  ner_ruler = nlp.add_pipe("entity_ruler",
                      before="ner",
                      config={"phrase_matcher_attr": "LOWER"})

  def read_file_categories(food_type):

      file_path = f"FoodOrderingDataset/data/{food_type}/alias"

      text_files = [f for f in os.listdir(file_path) if f.endswith('.txt')]

      patterns = []

      for file in text_files:
          path_to_file = f"{file_path}/{file}"
          with open(path_to_file, 'r') as file:
              for line in file:
                  if line.strip():
                      phrase, category_info = parse_line(line)
                      if phrase and category_info:
                          schema = {}
                          schema["pattern"] = phrase
                          schema["label"] = category_info
                          patterns.append(schema)

      return patterns

  category_patterns = read_file_categories(dataset)

  ner_ruler.add_patterns(category_patterns)
  return nlp


nlp = init_pipeline()

def process_NER(input_order):
    found_categories = []

    doc = nlp(input_order)

    for ent in doc.ents:
        found_categories.append((ent.text, ent.label_))

    return found_categories

In [None]:
import json
import re
from collections import defaultdict

file_path = f'FoodOrderingDataset/output/Ablation_NERCD_Coffee_Results_{model_name}.json'

existing_data=[]

with open('FoodOrderingDataset/processed_data/coffee_dataset.json', 'r') as file:
    data = json.load(file)

coffee_str = items = "['drink_type(americano)', 'drink_type(cappuccino)', 'drink_type(drip_coffee)', 'drink_type(espresso)', 'drink_type(hot_chocolate)', 'drink_type(latte)', 'not(not)', 'number(1)', 'number(10)', 'number(11)', 'number(12)', 'number(13)', 'number(14)', 'number(15)', 'number(2)', 'number(3)', 'number(4)', 'number(5)', 'number(6)', 'number(7)', 'number(8)', 'number(9)', 'quantity(extra)', 'quantity(light)', 'roast_type(cinnamon_roast)', 'roast_type(continental_roast)', 'roast_type(dark_roast)', 'roast_type(french)', 'roast_type(full_city_roast)', 'roast_type(guatemalan)', 'roast_type(italian)', 'roast_type(light_roast)', 'roast_type(medium_roast)', 'size(extra_large)', 'size(large)', 'size(regular)', 'size(small)', 'style(decaf)', 'style(flavored)', 'style(iced)', 'style(skinny)', 'topping(caramel_syrup)', 'topping(cinnamon)', 'topping(cinnamon_dolce_syrup)', 'topping(crumbles)', 'topping(drizzles)', 'topping(espresso_shot(1))', 'topping(espresso_shot(2))', 'topping(espresso_shot(3))', 'topping(espresso_shot(4))', 'topping(foam)', 'topping(hazelnut_syrup)', 'topping(honey)', 'topping(raspberry_syrup)', 'topping(syrup)', 'topping(vanilla_syrup)', 'topping(whipped_cream)']" 
all_coffee = "\nMenu: " + coffee_str.lower()

items_found_str = re.search(r'\nMenu: \[(.*?)\]', all_coffee).group(1)
items = re.findall(r'(\w+)\(([^)]+)\)', items_found_str)
items_dict = defaultdict(list)
for item_type, item_value in items:
    items_dict[item_type].append(item_value)

items_dict = dict(items_dict)
keys_to_extract = ['topping', 'size', 'number', 'drink_type', 'roast_type', 'not', 'style', 'quantity']

for key in keys_to_extract:
    globals()[f"{key}_values"] = items_dict.get(key, [])
    print(key, globals()[f"{key}_values"])

input_list = []
for obj in data:
    input_value = obj.get("input", "No input key found")
    output_value = obj.get("output_extract", "No output key found")
    output_generate = obj.get("output_generate", "No output key found")
    used_items_value = process_NER(input_value)
    used_items_value_decoupled = [x[0] + ' - ' + x[1] for x in used_items_value]
    used_items_str = ', '.join(used_items_value_decoupled).lower()

    input_augmented_file = input_value + "\nItems Found: " + used_items_str
    input_list.append((input_value, input_augmented_file, output_generate, used_items_value, used_items_str))

for i in range(len(input_list)):
    print(f"{i}/{len(input_list)}")
    if i > 130:
        break
    (initial_input, input, expected, used_items_value, used_items_str) = input_list[i]
    lm = model + f'''\
    Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.
    ### Instruction:
    {instruction_generate_coffee}
    ### Input:
    {input}

    ### Response:
    '''
    
    ans = lm + capture(validOrderCoffee(), "answer")

    existing_data.append({"input":initial_input, "input_augmented": input, "output": ans["answer"], "expected":expected, "output_NER": used_items_str})

with open(file_path, 'w') as file:
    json.dump(existing_data, file, indent=4)

In [None]:
import json
import re

def remove_duplicate_toppings(order_str: str) -> str:
    pattern = r"(toppings=\[)(.*?)(\])"
    
    def dedupe(match: re.Match) -> str:
        prefix = match.group(1)
        content = match.group(2)
        suffix = match.group(3)
        
        topping_pattern = r"Topping\([^)]*\)"
        topping_items = re.findall(topping_pattern, content)
        
        seen = set()
        unique_toppings = []
        for item in topping_items:
            if item not in seen:
                seen.add(item)
                unique_toppings.append(item)
        
        new_content = ", ".join(unique_toppings)
        return f"{prefix}{new_content}{suffix}"
    
    cleaned_str = re.sub(pattern, dedupe, order_str, flags=re.DOTALL)
    if cleaned_str[-4:] != ")])]" and "Topping" in cleaned_str: cleaned_str += ")]"
    return cleaned_str

In [None]:
import json

def calculate_accuracy_and_save_mismatches(json_file, output_file):
    with open(json_file, 'r') as file:
        data = json.load(file)

    total = len(data)
    correct = 0
    mismatches = []

    for item in data:
        output = item['output']
        expected = item['expected'].lower()

        output = remove_duplicate_toppings(output).lower()

        if output == expected:
            correct += 1
        else:
            print('\n\n')
            print(output)
            print(expected)
            mismatches.append(item)

    accuracy = (correct / total) * 100

    with open(output_file, 'w') as outfile:
        json.dump(mismatches, outfile, indent=4)

    return accuracy

json_file = f'FoodOrderingDataset/output/Ablation_NERCD_Coffee_Results_{model_name}.json'
mismatch_file = f'FoodOrderingDataset/output/Ablation_NERCD_Coffee_Mismatches_{model_name}.json'

accuracy = calculate_accuracy_and_save_mismatches(json_file, mismatch_file)
print(f"Accuracy: {accuracy:.2f}%")
print(f"Mismatches have been saved to: {mismatch_file}")