# Main

## Imports

In [1]:
%pwd

'/Users/liamroy/Documents/Studies/Monash_31194990/PHD/Studies/Study_04/LLM_vocab_optimization/scripts'

In [2]:
# Imports
import sys
import numpy as np
import random
import time
random.seed(time.time())

from itertools import product

from openai import OpenAI
client = OpenAI()

from robots_and_modules.helper_functions import llm_prompt_reply
from robots_and_modules.prompt_builder import build_prompt

## Define Inputs

In [11]:
## Configuration parameters
attempt_ID = '00'

iteration_quantity = 100
llm_model = "gpt-4o"                    # gpt-3.5-turbo     | Use this for dev/testing
                                        # gpt-4 / gpt-4o    | Use this when deployed, more expensive
                                        # gpt-4o-mini       | Lightweight, less expensive than gpt4o
temperature_coefficient = 1.0           # Moderately stochastic @ 0.6 to 1.0
frequency_penalty_coefficient = 1.0     # Lightly penalize repetition @ 0.2
top_p_coefficient=1.0                   # Nucleus sampling for controlled randomness @ 0.85 to 0.6
omission_probability = 0.5

# Default to go1_obj and 'spread' state space
robot_module = 'go1'        # 'go1' or 'jackal'
state_space = 'spread'      # 'spread' or 'minimized'

printer = True # True or None

### Check Robot Module and State Space

In [None]:
# Check if robot_module and state_space are within the bounds of go1/jackal and spread/minimized
if robot_module not in ['go1', 'jackal']:
    print(f"Error: Invalid robot module specified. Using default: {robot_module}")
if state_space not in ['spread', 'minimized']:
    print(f"Error: Invalid state space specified. Using default: {state_space}")


# Import the specified robot module as robot_obj
robot_obj = __import__("robots_and_modules." + robot_module)

### Select State Space based on Module

In [9]:
# The data in this should be in the format: "State Number: [State Name, State Description]"
# Used for Go1 robot
if robot_module == 'go1_obj':
    if state_space == 'spread':
        set_of_states = [
        "S01: [Waiting for Input, The robot is in standby mode waiting for a command from the user.]",
        "S02: [Analyzing Object, The robot is analyzing a target object in front of it on the ground.]", 
        "S03: [Found Object, The robot has found a target object in front of it on the ground.]",
        "S04: [Needs Help, The robot is experiencing an error and needs help from the user.]",
        "S05: [Confused, The robot is confused and unsure what to do.]",
        "S06: [Unsure, It is unclear as the robot does not appear to be in any of the described states.]"
        ]
        
    elif state_space == 'minimized':
        set_of_states = [
        "S01: [Waiting for Input, The robot is in standby mode waiting for a command from the user.]",
        "S02: [Interacting with Object, The robot is pointing out a target object in front of it on the ground.]", 
        "S03: [Needs Help, The robot is in a confused error staate and needs help from the user.]",
        "S04: [Unsure, It is unclear as the robot does not appear to be in any of the described states.]"
        ]



        
# Used for Jackal robot
elif robot_module == 'jackal_obj':
    if state_space == 'spread':
        set_of_states = [
            "S01: [Processing, The robot is analyzing the request and planning the navigation route.]",
            "S02: [Navigating, The robot is actively navigating toward the destination and does not require assistance.]",
            "S03: [Danger, The robot is signaling for the user's attention due to a detected hazard.]",
            "S04: [Stuck, The robot is signaling for the user's attention as its path is blocked.]",
            "S05: [Accomplished, The robot has successfully reached the requested destination.]",
            "S06: [Unsure, The robot's statr is unclear as it does not appear to be in any of the described states.]"
        ]

    elif state_space == 'minimized':
        set_of_states = [
            "S01: [Progressing, The robot is progressing on the assigned task and does not require assistance.]",
            "S02: [Alert, The robot is signaling for the user's attention due to a possible hazard or blocked path.]",
            "S03: [Accomplished, The robot has successfully reached the requested destination.]",
            "S04: [Unsure, The robot's statr is unclear as it does not appear to be in any of the described states.]"
        ]

### Create Robot Instance | Set Assistant Prompt and Context

In [None]:
llm_assistant_prompt = "You are an expert roboticist and understand how to design communicative expressions for human-robot interaction."

robot_instance = robot_obj.Robot(set_of_states)

deployment_context = f"Consider a scenario where you are collaborating with a {robot_instance.form_factor} robot to locate and pick strawberries in a strawberry patch."

## Accuracy Proxy Generator

### Generate the Accuracy Proxy .npy Array

In [None]:
# Initialize error and total counters
error_counter = 0
total_counter = 0

# Create robot object
robot_instance = robot_obj.Robot(set_of_states)

# Get action space shape, and use it to create a for loop that itterates all indices
action_space_shape = robot_instance.get_action_space_shape()

# Itterate through all possible actions in action space
# count = 0
# for indices in product(*(range(dim) for dim in action_space_shape)):
#     count += 1
#     print(f"Action {count}:")
#     robot_instance.set_active_parameter(list(indices))
#     print(robot_instance.active_parameters)
#     print(robot_instance.generate_description(omission_probability))
#     print("\n\n")

# Set active parameters based on action space shape for robot_instance
test_value_A = [0, 0, 1, 0, 1, 0]  # Example values within range for each parameter
test_value_B = [1, 2, 0, 2, 0, 1]  # Example values within range for each parameter
test_value_C = [0, 1, 2, 1, 1, 2]  # Example values within range for each parameter
set_set = [test_value_A, test_value_B, test_value_C]

for test_values in set_set:
    robot_instance.set_active_parameter(test_values)

    # Five iterations for loop
    for iteration in range(iteration_quantity):

        if printer:
            print(f"**********************************************************************")
            print(f"ITERATION {iteration+1} FOR {robot_instance.active_parameters}")
            print(f"**********************************************************************")

        # Generate description with test values and omission probability
        raw_description = robot_instance.generate_description(omission_probability)

        # Prepare a promt with context + robot description
        summary_prompt = f"{deployment_context} Summarize the explanation below, focusing on describing the robot's actions in this scenario:\n{raw_description}"
        
        ### SWAP UNCOMMENT TO DISABLE TEST AND ENABLE LLM
        # Summarize the context + robot description using llm 
        summarized_expression = raw_description
        # summarized_expression = llm_prompt_reply(prompt=summary_prompt, 
        #                                           client=client, 
        #                                           llm_model=llm_model, 
        #                                           llm_assistant_prompt=llm_assistant_prompt, 
        #                                           temperature_coefficient=temperature_coefficient, 
        #                                           frequency_penalty_coefficient=frequency_penalty_coefficient, 
        #                                           top_p_coefficient=top_p_coefficient)


        # Print before and after summary
        if printer:
            print(f"\nRAW DESCRIPTION:\n{raw_description}")
            print(f"\nLLM SUMMARY:\n{summarized_expression}")

        # Pass summarized expression to prompt builder, along with set of states and robot_instance parameters 
        acc_proxy_prompt = build_prompt(expression_string=summarized_expression, state_list=set_of_states, deployment_context=deployment_context, llm_assistant_prompt=llm_assistant_prompt)
        
        if printer:
            print(f"\n\nACCURACY PROXY PROMPT:\n~~~{acc_proxy_prompt}~~~\n\n")


        ### SWAP UNCOMMENT TO DISABLE TEST AND ENABLE LLM
        options = ["[S01, Waiting for Input]", "[S02, Analyzing Object]", "[S03, Found Object]", "[S04, Needs Help]", "[S05, Confused]", "[S06, Unsure]"]
        acc_proxy_reply = random.choice(options)
        # Run the prompt through the accuracy proxy model
        # acc_proxy_reply = llm_prompt_reply(acc_proxy_prompt, client, llm_model, llm_assistant_prompt, temperature_coefficient, frequency_penalty_coefficient, top_p_coefficient)

        # Parse the accuracy proxy reply to identify the state
        state_code, _ = acc_proxy_reply.strip("[]").split(", ")
        state_code = state_code.strip("'")
        
        # Check if the state code exists within the dictionary
        if state_code in robot_instance.action_space[tuple(test_values)]:
            # Increment the count for the identified state in the action space
            robot_instance.action_space[tuple(test_values)][state_code] += 1
            total_counter += 1

            if printer:
                # Output the identified state code
                print(f"Identified state code: {state_code}")
                # Output the updated action space
                print(f"Updated action space: {robot_instance.action_space[tuple(test_values)]}\n")

        else:
            # Output an error message if the state code does not exist
            error_counter += 1
            total_counter += 1
            
            if printer:
                print(f"Error: State code '{state_code}' does not exist in the action space.\n")

        #     # Log the reply from the accuracy proxy model to the robot_instance action_space
        #     robot_instance.action_space[1, 1, 0, 0, 1, 0] = acc_proxy_reply
    
    if printer:
        print(f"@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
        print(f"COMPLETED ALL {iteration_quantity} ITERATIONS FOR {robot_instance.active_parameters}")
        print(f"@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n\n")

if printer:
    print(f"$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
    print(f"COMPLETED ALL COMBINATIONS")
    print(f"$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n")


### Save the Array to External File

In [12]:
# Define the save string for the numpy array
save_string = robot_module + "_acc_array_" + attempt_ID + ".npy"

# Save the numpy array robot_instance.action_space to a file 
np.save(f"./../data/acc_arrays/{save_string}", robot_instance.action_space)

NameError: name 'robot_instance' is not defined

## Generate the Kinematic Distance Object

In [None]:
# PASS FOR NOW

## Evaluate the Cost Function

### This is all WIP below

In [14]:
# # Calculate the distance term for each action in action space based on the selected distance approach
# # distance_terms = calculate_distance_terms(robot_instance.action_space)

# # Analyze the action space for the robot_instance using cost function and determine the best action
# # best_action = analyze_action_space(robot_instance.action_space)

# # Log the best action to an external spreadsheet
# #log_data(.......)

# # Return the best action in terminal
# #print(f"\n\nBest action: {best_action}")



# Imports
import numpy as np
from itertools import product
import distance_calculator  # Assumed to contain distance functions

from robots_and_modules.go1 import Robot  # Import the Robot class




# Load accuracy proxy values from external file
ACCURACY_PROXY_FILE = "./../acc_arrays/go1_obj_acc_array_00.npy"



class PoseEvaluator:
    def __init__(self, motion_param_ranges, weight=0.5):
        """
        Initialize the evaluator.

        Parameters:
            motion_param_ranges (list of int): List containing the range of values for each motion parameter.   e.g. [3, 3, 3, 3]
            weight (float): Weighting factor for accuracy vs. distance in the cost function (0 <= w <= 1).      e.g. 0.5
        """
        self.motion_param_ranges = motion_param_ranges
        self.weight = weight
        self.poses_dict = {}  # Dictionary to store defined expressions
        self.accuracy_proxy = np.load(ACCURACY_PROXY_FILE)  # Load accuracy proxy values

    def get_accuracy(self, A):
        """
        Retrieves accuracy proxy value for a given pose A.

        Parameters:
            A (tuple): A motion parameter combination (n-vector).

        Returns:
            float: Accuracy proxy value for A.
        """
        return self.accuracy_proxy[tuple(A)]  # Access value based on indices in A

    def get_min_distance(self, A):
        """
        Computes the minimum distance between A and all defined expressions in the dictionary.

        Parameters:
            A (tuple): A motion parameter combination (n-vector).

        Returns:
            float: Minimum distance from A to any other expression. Returns 0 if no expressions exist.
        """
        if not self.poses_dict:  # If no other expressions exist
            return 0.0

        distances = [distance_calculator.calculate_distance("emd", np.array(A), np.array(B))
                     for B in self.poses_dict.values()]
        return min(distances)  # Return the minimum distance found

    def evaluate_cost(self, A):
        """
        Evaluates the cost function f(A, B).

        Parameters:
            A (tuple): A motion parameter combination (n-vector).

        Returns:
            float: Evaluated cost value.
        """
        accuracy_term = self.get_accuracy(A)
        distance_term = self.get_min_distance(A)
        cost = self.weight * accuracy_term + (1 - self.weight) * distance_term
        return cost

    def find_best_pose(self):
        """
        Iterates through all possible parameter value combinations to find the best pose.

        Returns:
            tuple: Best parameter combination with the highest cost value.
        """
        best_A = None
        best_cost = float("-inf")

        # Generate all possible motion parameter combinations
        for A in product(*[range(v) for v in self.motion_param_ranges]):
            cost = self.evaluate_cost(A)

            if cost > best_cost:
                best_cost = cost
                best_A = A

        return best_A, best_cost


# Example Usage
if __name__ == "__main__":
    # Define motion parameter ranges (e.g., each parameter can take values 0, 1, 2)
    motion_param_ranges = [3, 3, 3, 3]  # Example: 4 motion parameters, each in range 0-2

    evaluator = PoseEvaluator(motion_param_ranges, weight=0.7)  # Weighting accuracy more

    # Define some initial expressions in the dictionary
    evaluator.poses_dict = {
        "stuck": (3, 2, 1, 2),
        "progressing": (1, 0, 2, 1)
    }

    # Find the best pose based on the cost function
    best_pose, best_cost = evaluator.find_best_pose()

    print(f"Best Pose: {best_pose} with Cost: {best_cost}")


ModuleNotFoundError: No module named 'distance_calculator'