# Functional Equivalence Heuristic Comparison

## Setup

In [None]:
# Install NL2CMD accuracy metric

! rm -rf bashlint/ metric/
! git clone https://github.com/IBM/clai.git
! git -C ./clai checkout nlc2cmd
! cp -r clai/utils/bashlint bashlint
! cp -r clai/utils/metric metric
! rm -rf clai
! sed -i 's/import collections/import collections.abc as collections/' bashlint/butils.py

In [2]:
# Make results folder

! mkdir feh_results

In [None]:
# Imports

import os
import csv
import requests
import json

from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from metric import metric_utils
from sentence_transformers import SentenceTransformer
from sentence_transformers.util import cos_sim
from sklearn.feature_extraction.text import TfidfVectorizer

from datasets import load_dataset
from icalfa import submit_command
from openai import OpenAI
from tabulate import tabulate
from tqdm import tqdm

In [4]:
# API keys

os.environ['ICALFA_OPENAI_API_KEY'] = '...'

client = OpenAI(api_key='...')

In [5]:
# Ollama and OpenAI functions

system_prompt = "You will be given a task and two Bash commands. The first command is the ground truth. If the second command accomplishes the task, return true. Otherwise, return false. Only output 'true' or 'false'."

def openai(prompt, model):
    completion = client.chat.completions.create(
    model=model,
    messages=[
        {'role': 'system', 'content': system_prompt},
        {'role': 'user', 'content': prompt}
    ],
    temperature=0,
    seed=123
    )
    content = completion.choices[0].message.content
    stripped = content.replace("\n", "").replace("\r", "")
    if stripped.lower() == "true":
        return 1
    else:
        return 0
    
def ollama(prompt, model):
    url = "http://localhost:11434/api/chat"
    payload = json.dumps({
        "model": model,
        "messages": [
            {'role': 'system', 'content': system_prompt},
            {"role": "user", "content": prompt}
        ],
        "stream": False,
        "temperature": 0,
        "seed": 123
    })
    response = requests.post(url, data=payload)
    if response.status_code != 200:
        print(f"Error creating request: {response.text}")
    else:
        response_json = response.json()
        response_content = response_json['message']['content']
        stripped = response_content.replace("\n", "").replace("\r", "")
        if stripped.lower() == "true":
            return 1
        else:
            return 0

## Heuristics

In [6]:
# Bleu FEH
def bleu(index, prompt, ground_truth_command, model_command):
    gt_list = list(ground_truth_command)
    model_list = list(model_command)
    bleu_score = sentence_bleu([gt_list], model_list, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=SmoothingFunction().method1)
    return bleu_score > 0.75

# NL2CMD FEH
def nl2cmd(index, prompt, ground_truth_command, model_command):
    nl2cmd_score = metric_utils.compute_metric(predicted_cmd=model_command, predicted_confidence=1, ground_truth_cmd=ground_truth_command)
    # scale score to [0-1]
    nl2cmd_score = (nl2cmd_score + 1)/2
    return nl2cmd_score > 0.75

# TF-IDF FEH
def tfidf(index, prompt, ground_truth_command, model_command):
    try:
        vect = TfidfVectorizer()
        tfidf = vect.fit_transform([ground_truth_command, model_command])
        similarity = tfidf * tfidf.T
        tfidf_score = similarity.toarray()[0][1]
    except:
        tfidf_score = 0
    return tfidf_score > 0.75

# Execution + TF-IDF FEH
def exec_tfidf(index, prompt, ground_truth_command, model_command):
    score = submit_command(index=index, command=model_command, eval_mode="tfidf")
    return score == 1

# mxbai-embed FEH
embedding_model = SentenceTransformer("mixedbread-ai/mxbai-embed-large-v1")
def mxbai_embed(index, prompt, ground_truth_command, model_command):
    embeddings = embedding_model.encode([ground_truth_command, model_command])
    cosine_similarity = cos_sim(embeddings[0], embeddings[1])
    embedding_score = cosine_similarity.item()
    return embedding_score > 0.75

# Execution + mxbai-embed FEH
def exec_mxbai_embed(index, prompt, ground_truth_command, model_command):
    score = submit_command(index=index, command=model_command, eval_mode="embed", eval_param=0.75)
    return score == 1

# Llama3.1-8b-Instruct FEH
def llama3(index, prompt, ground_truth_command, model_command):
    score = ollama(f"Task: {prompt}, Ground Truth Command: {ground_truth_command}, Model Command: {model_command}", "llama3.1:8b")
    return score == 1

# Execution + Llama3.1-8b-Instruct FEH
def exec_llama3(index, prompt, ground_truth_command, model_command):
    score = submit_command(index=index, command=model_command, eval_mode="ollama", eval_param="llama3.1:8b")
    return score == 1

# GPT-3.5-Turbo FEH
def gpt3(index, prompt, ground_truth_command, model_command):
    score = openai(f"Task: {prompt}, Ground Truth Command: {ground_truth_command}, Model Command: {model_command}", "gpt-3.5-turbo-0125")
    return score == 1

# Execution + GPT-3.5-Turbo FEH
def exec_gpt3(index, prompt, ground_truth_command, model_command):
    score = submit_command(index=index, command=model_command, eval_mode="openai", eval_param="gpt-3.5-turbo-0125")
    return score == 1

# GPT-4 FEH
def gpt4(index, prompt, ground_truth_command, model_command):
    score = openai(f"Task: {prompt}, Ground Truth Command: {ground_truth_command}, Model Command: {model_command}", "gpt-4-0613")
    return score == 1

# Execution + GPT-4 FEH
def exec_gpt4(index, prompt, ground_truth_command, model_command):
    score = submit_command(index=index, command=model_command, eval_mode="openai", eval_param="gpt-4-0613")
    return score == 1

## Comparison

In [None]:
file_names = ['bleu.csv', 'nl2cmd.csv', 'tfidf.csv', 'exec_tfidf.csv', 'mxbai_embed.csv', 'exec_mxbai_embed.csv', 'llama3.csv', 'exec_llama3.csv', 'gpt3.csv', 'exec_gpt3.csv', 'gpt4.csv', 'exec_gpt4.csv']

fehs = [bleu, nl2cmd, tfidf, exec_tfidf, mxbai_embed, exec_mxbai_embed, llama3, exec_llama3, gpt3, exec_gpt3, gpt4, exec_gpt4]

dataset = load_dataset("westenfelder/NL2SH-ALFA", "test", split="train")

# rotate dataset by 10 positions to create non-equivalent commands
indices = list(range(300))
shuffled = indices[-10:] + indices[:-10]

for file_index, name in enumerate(file_names):
    if os.path.exists(f"feh_results/{name}"):
        print(f"{name} already exists, skipping")
        continue

    print(f"Creating {name}")

    with open(f"feh_results/{name}", mode='w') as file:
        writer = csv.writer(file)
        writer.writerow(['Task', 'Ground Truth Command', 'Model Command', 'Functional Equivalence', 'Heuristic Output'])

        for data_index, row in tqdm(enumerate(dataset), total=len(dataset)):
            prompt = row['nl']
            ground_truth_command = row['bash']
            func_equiv_command = row['bash2']
            not_func_equiv_command = dataset[shuffled[data_index]]["bash"]
            # get heuristic output
            func_equiv_feh_output = fehs[file_index](data_index, prompt, ground_truth_command, func_equiv_command)
            not_func_equiv_feh_output = fehs[file_index](data_index, prompt, ground_truth_command, not_func_equiv_command)

            writer.writerow([prompt, ground_truth_command, func_equiv_command, "True", func_equiv_feh_output])
            writer.writerow([prompt, ground_truth_command, not_func_equiv_command, "False", not_func_equiv_feh_output])

In [None]:
# Print results

results = [["Heuristic", "TP", "FP", "TN", "FN", "Precision", "Recall", "F1 Score", "Accuracy"]]
for name in file_names:        
    with open(f"feh_results/{name}", mode='r') as file:
        read = csv.reader(file)

        true_positive = 0
        false_positive = 0
        true_negative = 0
        false_negative = 0
        for row in read:
            if row[3] == "True" and row[4] == "True":
                true_positive += 1
            elif row[3] == "True" and row[4] == "False":
                false_negative += 1
            elif row[3] == "False" and row[4] == "True":
                false_positive += 1
            elif row[3] == "False" and row[4] == "False":
                true_negative += 1

        precision = true_positive / (true_positive + false_positive)
        recall = true_positive / (true_positive + false_negative)
        f1 = 2 * precision * recall / (precision + recall)
        acc = (true_positive + true_negative) / (true_positive + true_negative + false_positive + false_negative)
        
        results.append([name, true_positive, false_positive, true_negative, false_negative, f"{precision:0.2f}", f"{recall:0.2f}", f"{f1:0.2f}", f"{acc:0.2f}"])

latex_table = tabulate(results, headers="firstrow", tablefmt="latex")
print(latex_table)