## **Setup**

In [1]:
import os
import openai
import signal
import tqdm.notebook as tqdm
import random
import numpy as np
import matplotlib.pyplot as plt
import json
import pdb
import torch
from llm import timeout, lm
from itertools import combinations
from cp_utils import temperature_scaling, get_llm_preds, get_top_logprobs_orig, get_non_conformity_score_multi_label
from prompt_init import get_init_prompt_chat, get_reason_prompt, get_pred_prompt
from utils import get_combs, remove_last_line, get_all_possible_options_multi_label, get_mc_dataset, process_mc_multi_label
from process_results import get_results_multi_label, get_llm_preds_multi_label

In [None]:
# Input your openAI key and specify your model
openai.api_key = ""
gpt_model="gpt-4-1106-preview"

## **Prompt Specification**

In [None]:
mc_gen_prompt_template = """You are simulating a robot operating in an office kitchen. You are in front of a counter with two closed drawers, a top one and a bottom one. There is also a landfill bin, a recycling bin, and a compost bin.

Scene: On the counter, there is an orange soda, a Pepsi, and an apple.
Task: Put that drink in the top drawer.
Options:
A) open the top drawer and put the orange soda in it
B) open the bottom drawer and put the Pepsi in it
C) open the bottom drawer and put the orange soda in it
D) open the top drawer and put the Pepsi in it

Scene: On the counter, there is an energy bar, a banana, and a microwave.
Task: Put the snack next to the microwave.
Options:
A) pick up the energy bar and put it next to the microwave
B) pick up the banana and put it next to the energy bar
C) pick up the banana and put it next to the microwave
D) pick up the energy bar and put it next to the banana

Scene: On the counter, there is a Coke, a Sprite, and a sponge.
Task: Can you dispose of the can? It should have expired.
Options:
A) pick up the sponge and put it in the landfill bin
B) pick up the Coke and put it in the recycling bin
C) pick up the Sprite and put it in the recycling bin
D) pick up the Coke and put it in the landfill bin

Scene: On the counter, there is a bottled water, a bag of jalapeno chips, and a bag of rice chips.
Task: I would like a bag of chips.
Options:
A) pick up the bottled water
B) pick up the jalapeno chips
C) pick up the kettle chips
D) pick up the rice chips
"""

scenario_test_prompt = """Scene: On the counter, there is {}.
Task: {}
Options:
"""

In [None]:
reason_gen_prompt_template = """You are simulating a robot operating in an office kitchen. You are in front of a counter with two closed drawers, a top one and a bottom one. There is also a landfill bin, a recycling bin, and a compost bin.
For each scene and task provided, analyze the options, identify the options that align with the ground truth, and summarize the uncertainty.

Scene: On the counter, there is an orange soda, a Pepsi, and an apple.
Task: Put that drink in the top drawer.
Options:
A) open the top drawer and put the orange soda in it
B) open the bottom drawer and put the Pepsi in it
C) open the bottom drawer and put the orange soda in it
D) open the top drawer and put the Pepsi in it
Ground Truth Action(s): A, D
You: The task's phrase "Put that drink in the top drawer" is ambiguous, as it doesn't specify whether the "drink" refers to the orange soda or the Pepsi. This ambiguity makes both options A and D potentially valid choices.

Scene: On the counter, there is an energy bar, a dirty sponge with food residue, and a bottled unsweetened tea.
Task: I don't want to use it for cleaning any more. Can you please dispose of it?
Options:
A) pick up the dirty sponge with food residue and put it in the landfill bin
B) pick up the energy bar and put it in the landfill bin
C) pick up the energy bar and put it in the recycling bin
D) pick up the bottled unsweetened tea and put it in the landfill bin
Ground Truth Action(s): A
You: The task suggests disposal of a cleaning item, pointing to the "dirty sponge with food residue". Sponges, especially dirty ones, are generally non-recyclable. Hence, option A, placing the sponge in the landfill bin, is the appropriate action.

Scene: On the counter, there is a Coke, a Sprite, and a sponge.
Task: Can you dispose of the can? It should have expired.
Options:
A) pick up the sponge and put it in the landfill bin
B) pick up the Coke and put it in the recycling bin
C) pick up the Sprite and put it in the recycling bin
D) pick up the Coke and put it in the landfill bin
Ground Truth Action(s): B, C
You: The instruction "dispose of the can" refers to either Coke or Sprite, but doesn't specify which. Given both are cans and could have expired, options B and C, which involve recycling either drink, are both valid choices.
"""

## **Load Pre-generated Dataset**
KnowNo assumes there exists a distribution of scenarios that the robot may encounter. A scenario refers to an environment with objects, a (potentially ambiguous) language instruction, and a goal. Here we provide a dataset of 600 scenarios sampled from a pre-defined scenario distribution, including the corresponding prompts for the LLM.

We will also split the dataset into a train set of size 400 and a test set of size 200.

In [None]:
num_calibration_data = 200
num_test_data = 100

In [None]:
scenario_info_path = './data/mobile_manipulation.txt'
with open(scenario_info_path, 'r') as f:
    scenario_info_text = f.read()
scenario_info_text = scenario_info_text.split('\n\n')
scenario_info_text_train = scenario_info_text[:num_calibration_data]
scenario_info_text_test = scenario_info_text[-num_test_data:]

In [None]:
scenario_info_path = './data/mobile_manipulation_knowledge.txt'
with open(scenario_info_path, 'r') as f:
    scenario_info_text_k = f.read()
scenario_info_text_k = scenario_info_text_k.split('\n\n')

In [None]:
calibration_set = get_init_prompt_chat(scenario_info_text_train, scenario_test_prompt, mc_gen_prompt_template)
test_set = get_init_prompt_chat(scenario_info_text_test, scenario_test_prompt, mc_gen_prompt_template)
knowledge_base = get_init_prompt_chat(scenario_info_text_k, scenario_test_prompt, mc_gen_prompt_template)


## **Multiple Choice Question Answering**
For each scenario, we first applies few-shot prompting to generate plausible options to take.

In [None]:
calibration_set = get_mc_dataset(calibration_set)

In [None]:
test_set = get_mc_dataset(test_set)

In [None]:
knowledge_base = get_mc_dataset(knowledge_base)

## **Knowledge base construction**

In [None]:
# @markdown Post-process the generated MC and get prompt for querying likelihood
mc_score_background_prompt = "You are simulating a robot operating in an office kitchen. " \
                             "You are in front of a counter with two closed drawers, a top one and a bottom " \
                             "one. There is also a landfill bin, a recycling bin, and a compost bin. Please follow the template exactly to generate your response."
train_prompt_template = "{}\nOptions:\n{}\nExplain: {}\nPrediction: {}"
all_train_prompts = []
# process
for i in tqdm.trange(len(all_train_prompts), len(knowledge_base)):
    dataset = knowledge_base
    mc_gen_raw = dataset[i]['mc_gen_raw'].strip()
    mc_gen_full, mc_gen_all = process_mc_multi_label(mc_gen_raw)
    info = dataset[i]['info']
    true_options, poss_options, flexible_options = get_all_possible_options_multi_label(info, mc_gen_all)

    cur_scenario_prompt = dataset[i]['mc_gen_prompt'].split('\n\n')[-1].strip()
    mc_score_prompt = reason_gen_prompt_template + '\n' + cur_scenario_prompt + '\n' + mc_gen_full
    
    poss_actions_str = ", ".join(poss_options)
    mc_score_prompt += f"\nGround Truth Action(s): {poss_actions_str}"
    mc_score_prompt += "\nYou:"
    _, text = lm(mc_score_prompt, logit_bias={})    
    dataset[i]['mc_score_prompt'] = mc_score_prompt
    scenario = cur_scenario_prompt.split("Options")[0].strip()
    train_prompt = train_prompt_template.format(scenario, mc_gen_full, text, poss_actions_str)
    all_train_prompts.append(train_prompt)

## **Scenario embeddings**

In [None]:
from sentence_transformers import SentenceTransformer
model_name = "sentence-transformers/paraphrase-distilroberta-base-v2"  # Or another SBERT model of your choice
model = SentenceTransformer(model_name)
def get_sentence_embeddings(sentences):
    # Generate embeddings using Sentence-BERT
    embeddings = model.encode(sentences)
    return embeddings

In [None]:
mc_score_background_prompt = "You are simulating a robot operating in an office kitchen. " \
                             "You are in front of a counter with two closed drawers, a top one and a bottom " \
                             "one. There is also a landfill bin, a recycling bin, and a compost bin. Please follow the template exactly to generate your response."
scenario_prompts = []
for prompt in all_train_prompts:
    scenario = prompt.split("\n")[1]
    scenario_prompts.append(scenario)
sen_embeddings = get_sentence_embeddings(scenario_prompts)

## **Deployment**

In [None]:
import pdb
def get_test_predictions(test_set, use_pred=True):
    num_test_data = len(test_set)
    for i in tqdm.trange(num_test_data):
        test_data = test_set[i]
        
        mc_gen_raw = test_data['mc_gen_raw'].strip()
        mc_gen_full, mc_gen_all = process_mc_multi_label(mc_gen_raw)

        # retrieve the top k prompt
        prompt = test_data['mc_gen_prompt'].split("\n\n")[-1].strip()
        test_embed = get_sentence_embeddings(prompt.split("\n")[1])
        sims = test_embed @ sen_embeddings.T
        sims = sims.squeeze()
        topk_idx = np.argsort(-sims)[:3]
        top_prompts = np.take(all_train_prompts, topk_idx)
        top_prompts = top_prompts.tolist()
        top_join_promts = "\n\n".join(top_prompts)

        # get the final prompt and final output
        prompt_final_txt = mc_score_background_prompt + "\n\n" + top_join_promts + "\n\n" + prompt + "\n" + mc_gen_full
        _, text = lm(prompt_final_txt, logit_bias={})  
        
        info = test_set[i]['info']
        true_options, poss_options, flexible_options = get_all_possible_options_multi_label(info, mc_gen_all)
        test_data['true_options'] = true_options
        test_data['poss_options'] = poss_options
        test_data['flex_options'] = flexible_options
        test_data["mc_gen_full"] = mc_gen_full
        test_data["mc_gen_all"] = mc_gen_all
        test_data["whole_prompt"] = prompt_final_txt.strip() + "\n" + text

        # Conformal Prediction
        test_prompt = prompt + "\n" + mc_gen_full + "\n" + text
        # test_prompt = test_data["whole_prompt"].split("\n\n")[-1].strip()
        if not use_pred:       
            test_prompt = test_prompt.split("Prediction: ")[0].strip()

        # this can be wrapped as a new function
        all_options = ["A", "B", "C", "D"]
        all_combs = get_combs(all_options)
        template  = "\nIs the set {} valid according to the user's request? Reply 'Y' if it exactly matches all valid options, and 'N' if it includes any invalid options or is a subset of the valid options."
        all_probs = []
        for option in all_combs:
            mc_score_prompt = mc_score_background_prompt + "\n\n" + test_prompt
            score_prompt = template.format(option)
            score_prompt = score_prompt.replace(" [", " {").replace("] ", "} ")
            mc_score_prompt += score_prompt
            mc_score_response, _ = lm(mc_score_prompt, max_tokens=1, logprobs=True, top_logprobs=20, logit_bias={56: 100.0, 45: 100.0})
            logprobs_full = get_top_logprobs_orig(mc_score_response)
            probs = [logprobs_full["Y"], logprobs_full["N"]]
            probs = temperature_scaling(probs, temperature=5)
            all_probs.append(probs[0])
            
        template_empty = "\n\nIs there no valid option? Reply 'Y' for yes, and 'N' for no."
        mc_score_prompt = mc_score_background_prompt + "\n\n" + test_prompt
        score_prompt = template_empty
        mc_score_prompt += score_prompt
        mc_score_response, _ = lm(mc_score_prompt, max_tokens=1, logprobs=True, top_logprobs=20, logit_bias={56: 100.0, 45: 100.0})
        logprobs_full = get_top_logprobs_orig(mc_score_response)
        probs = [logprobs_full["Y"], logprobs_full["N"]]
        probs = temperature_scaling(probs, temperature=5)
        all_probs.append(probs[0])
        all_combs.append([])
        
        # Combine log probabilities and options into a list of tuples
        combined = list(zip(all_probs, all_combs))
        # Sort the combined list based on log probabilities in descending order
        combined.sort(key=lambda x: x[0], reverse=True)
        # Separate the sorted elements
        top_probs, top_lists = zip(*combined)
        # If you need them as lists instead of tuples
        top_probs = list(top_probs)
        top_lists = list(top_lists)
        test_data['top_lists'] = top_lists
        test_data['top_probs'] = top_probs
    return test_set    

In [None]:
calibration_set = get_test_predictions(calibration_set, use_pred=False)

In [None]:
test_set = get_test_predictions(test_set, use_pred=False)

### Specify target success rate and apply conformal prediction

In [None]:
target_success = 0.86
epsilon = 1-target_success

In [None]:
non_conformity_score = get_non_conformity_score_multi_label(calibration_set)
q_level = np.ceil((num_calibration_data + 1) * (1 - epsilon)) / num_calibration_data
qhat = np.quantile(non_conformity_score, q_level, method='higher')
test_set = get_llm_preds_multi_label(test_set, qhat)

In [None]:
results = get_results_multi_label(test_set)
print('============== Summary ==============')
print("============== Test set =============")
print('Number of bag data:', num_calibration_data)
print('Number of test data:', len(test_set))
print('Average prediction set size:', results['avg_prediction_set_size'])
print('Success rate:', results['success_rate'])