In [83]:
import numpy as np, matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from palettable.colorbrewer.qualitative import Set1_9
import os, math
from datetime import datetime
from IPython.display import HTML

pwd = os.path.abspath('') + "/"
print(pwd)

c:\Users\qbr5kx\OneDrive - University of Virginia\Desktop\UVA\PhD Scratch\Active_Epistemic_Inference\particle_filter_robot/


In [84]:
def softmax(x):
    """Compute softmax values for each sets of scores in x."""
    e_x = np.exp(x)
    return e_x / e_x.sum()

def log_stable(x):
    """Compute log values for each sets of scores in x."""
    return np.log(x + np.exp(-16))

class Agent:
    def __init__(self, state, agentType, velocity_options, heading_options, goals):
        self.state = state #This is the true state of the robot
        self.type = agentType
        self.velocity_options = velocity_options
        self.heading_options = heading_options
        self.goal_prior = np.ones(goals.shape[0])*1/goals.shape[0]
        self.likelihood = self.goal_prior
        self.posterior = self.goal_prior
        self.max_iterations = 100

    def update(self, heading, velocity, dt):
        """Update agent's position based on chosen velocity and heading."""
        self.state[0] += velocity * np.cos(heading) * dt
        self.state[1] += velocity * np.sin(heading) * dt
        self.state[2] = heading
    
    def get_measurements(self, observations, goals):
        """Return measurements of agent's position given observations."""
        for key in observations:
            if key['type'] == 'A':
                distance = np.linalg.norm(self.state-key['obs'])
                self.measured_states[key['id']] = distance
            

class SimEnv:
    def __init__(self, num_agents, goals, num_timesteps, dt):
        self.num_agents = num_agents
        self.num_goals = goals.shape[0]
        self.num_timesteps = num_timesteps
        self.dt = dt
    def calculate_kl_divergence(p, q):
        """Calculate KL divergence between two probability distributions."""
        return np.sum(p * np.log(p / q + np.exp(-16)))
    def calculate_shannon_entropy(p):
        """Calculate Shannon entropy of a probability distribution."""
        return -np.sum(p * np.log(p))
    def calculate_likelihood(self, agent):
        """Calculate likelihood of agent's measurements given each goal."""
        for id in range(self.num_goals):
            if agent.type == 'A':
                distance = np.linalg.norm(agent.measured_states[id]-agent.state)
                agent.likelihood[id] = np.exp(-distance)
            else:
                azimuth = np.arctan2(agent.measured_states[id][1]-agent.state[1],agent.measured_states[id][0]-agent.state[0])
                agent.likelihood[id] = np.exp(-azimuth)
        return agent.likelihood
                

    
class agentSystem:
    def __init__(self, agents):
        self.agents = agents
    def update(self,agents):
        self.agents = agents
    def simulate_observations(self, agents):
        """Simulate noisy observations of agent positions given agents structure."""
        observations = {}
        for id1,agent1 in enumerate(agents):
            id1Obs = []
            for id2,agent2 in enumerate(agents):
                if agent1.type == 'A':
                    distance = np.linalg.norm(agent2.state-agent1.state)
                    observation = np.linalg.norm(agent2.state-agent1.state) + np.random.normal(0, np.sqrt(distance),1)
                else:
                    azimuth = np.arctan2(agent2.state[1]-agent1.state[1],agent2.state[0]-agent1.state[0])
                    observation = azimuth + np.random.uniform(0, azimuth,1)
                id1Obs.append(observation)
            observations[id1] = {'id': id1, 'obs': id1Obs, 'type': agent1.type}
        return observations
        

In [85]:
cmap = Set1_9.mpl_colors
# Re-define the environment and simulation parameters here
goals = np.array([[1, 1], [10, 10]], dtype=float)  # Goal positions
agent_positions = np.array([[9, 1, np.pi/2], [1, 9, np.pi/2]], dtype=float)  # Initial agent positions
agent_types = ['A', 'A']  # Agent types for differentiating their types
velocity_options = [0, 0.5, 1.0]  # Velocity options for the agents
heading_options = [0, np.pi/4, np.pi, 3*np.pi/4]  # Heading options (radians)
observation_error_std = 3.0  # Observation noise standard deviation
max_iterations = 100  # Maximum number of iterations
num_agents = agent_positions.shape[0]  # Number of agents

agents = []  # List to store agent objects
agent_dict = {}  # Dictionary to store agent system objects for each agent
# Initialize agents
for i in range(num_agents):
    agents.append(Agent(np.zeros((num_agents, 1)), agent_positions[i], agent_types[i], velocity_options, heading_options, goals))
# Initialize simulation environment
env = SimEnv(num_agents, goals, max_iterations, 0.1)  # Initialize simulation environment
# Initialize agent system
for i in range(num_agents):
    agent_dict[i] = {'ID': i, 'Agent': agents[i], 'AgentSystem': agentSystem(agents)}




In [86]:
# Simulate noisy observation of the other agent's position
for id in agent_dict.keys():
    agent_sys = agent_dict[id]['AgentSystem']
    observations = agent_sys.simulate_observations(agent_sys.agents)
    measurements = get_measurements(observations)
        




{0: {'id': 0, 'obs': [array([0.]), array([10.30829063])], 'type': 'A'}, 1: {'id': 1, 'obs': [array([9.59780487]), array([0.])], 'type': 'A'}}
{0: {'id': 0, 'obs': [array([0.]), array([6.47196933])], 'type': 'A'}, 1: {'id': 1, 'obs': [array([14.3915028]), array([0.])], 'type': 'A'}}


In [None]:

for goal in goals:
    # Initialize a score for how attainable each goal seems for both agents
    goal_scores = []
            
    for velocity in velocity_options:
        for heading in heading_options:
            predicted_position = predict_agent_position(agent_positions[agent_id], velocity, heading)

            # Estimate how both agents are aligned with reaching the current goal
            distance_to_goal = np.linalg.norm(predicted_position - goal)
            distance_other_to_goal = np.linalg.norm(other_agent_observed_position - goal)

            # Use the sum of both distances as a simple score for this action's alignment with the goal
            goal_alignment_score = distance_to_goal + distance_other_to_goal
            
            goal_scores.append((goal_alignment_score, velocity, heading))

    # Choose the action (for the current goal) that minimizes the combined distance
    best_action_for_goal = min(goal_scores, key=lambda x: x[0])

    # Update best action if this goal is more attainable than previous best
    if best_action_for_goal[0] < best_score:
        best_score = best_action_for_goal[0]
        best_action = best_action_for_goal[1], best_action_for_goal[2]

return best_action




# ani.save(pwd + "videos/two_goals_choice" + current_time + ".mp4", writer='ffmpeg', fps=3, dpi=300)
# print("Image saved as: ", pwd + "videos/two_goals_choice" + current_time + ".mp4")


In [None]:
def make_decision(agent_id, agent_positions):
    """Agent decision-making based on active inference to encourage convergence on a shared goal."""
    best_action = None
    best_score = np.inf

    # Simulate noisy observation of the other agent's position
    for id in agent_dict.keys():
        print(agent_dict[id])
        

    

    for goal in goals:
        # Initialize a score for how attainable each goal seems for both agents
        goal_scores = []
                
        for velocity in velocity_options:
            for heading in heading_options:
                predicted_position = predict_agent_position(agent_positions[agent_id], velocity, heading)

                # Estimate how both agents are aligned with reaching the current goal
                distance_to_goal = np.linalg.norm(predicted_position - goal)
                distance_other_to_goal = np.linalg.norm(other_agent_observed_position - goal)

                # Use the sum of both distances as a simple score for this action's alignment with the goal
                goal_alignment_score = distance_to_goal + distance_other_to_goal
                
                goal_scores.append((goal_alignment_score, velocity, heading))

        # Choose the action (for the current goal) that minimizes the combined distance
        best_action_for_goal = min(goal_scores, key=lambda x: x[0])

        # Update best action if this goal is more attainable than previous best
        if best_action_for_goal[0] < best_score:
            best_score = best_action_for_goal[0]
            best_action = best_action_for_goal[1], best_action_for_goal[2]
    
    return best_action




# ani.save(pwd + "videos/two_goals_choice" + current_time + ".mp4", writer='ffmpeg', fps=3, dpi=300)
# print("Image saved as: ", pwd + "videos/two_goals_choice" + current_time + ".mp4")


In [31]:
prior = np.array([0.5,0.5])
obs1 = np.array([10,20])
obs2 = np.array([np.pi/4,np.pi/2])
likelihood1 = softmax(-obs1)
print(likelihood1)
likelihood2 = softmax(-obs2)
print(likelihood2)
joint = likelihood1 * likelihood2 
posterior = softmax(joint + np.log(prior))
print(posterior)

print(softmax([0,0]))



[9.99954602e-01 4.53978687e-05]
[0.68684237 0.31315763]
[0.66525402 0.33474598]
[0.5 0.5]


Here is the plotting arguments

In [None]:

# Initialize figure for plotting
fig, ax = plt.subplots()
plt.xlim(-5, 15)
plt.ylim(-5, 15)
# Paths (with smaller line width)
agent_paths = [ax.plot([], [], 'o-', markersize=3, linewidth=1, alpha=0.5, color=cmap[i])[0] for i in range(2)]
# Current positions (with larger markers)
agent_markers = [ax.plot([], [], 'o', markersize=10, color=cmap[i])[0] for i in range(2)]
goal_plots = [ax.plot(goal[0], goal[1], 'x', markersize=10, color='purple')[0] for goal in goals]  # Plot goals


def init():
    """Initialize the background of the plot."""
    for agent_path, agent_marker in zip(agent_paths, agent_markers):
        agent_path.set_data([], [])
        agent_marker.set_data([], [])
    return agent_paths + agent_markers

def update(frame):
    """Update the plot for each frame."""    
    decisions = [make_decision(agent_id, agent_positions) for agent_id in range(2)]
    
    # Update positions based on decisions
    for agent_id, (velocity, heading) in enumerate(decisions):
        dx = velocity * np.cos(heading)
        dy = velocity * np.sin(heading)
        agent_positions[agent_id] += np.array([dx, dy])
    
    # Update plot data
    for agent_id, (agent_path, agent_marker) in enumerate(zip(agent_paths, agent_markers)):
        xdata, ydata = agent_path.get_data()
        xnew, ynew = agent_positions[agent_id]
        xdata = np.append(xdata, xnew)
        ydata = np.append(ydata, ynew)
        agent_path.set_data(xdata, ydata)
        agent_marker.set_data(xnew, ynew)
    
    return agent_paths + agent_markers

def run_simulation(max_iterations=100):
    """Run the simulation until both agents converge to the same goal or max iterations reached."""
    current_positions = np.copy(agent_positions)
    
    for iteration in range(max_iterations):
        decisions = [make_decision(agent_id, current_positions) for agent_id in range(2)]
        
        # Update agent positions based on their decisions
        for agent_id, (velocity, heading) in enumerate(decisions):
            dx = velocity * np.cos(heading)
            dy = velocity * np.sin(heading)
            current_positions[agent_id] += np.array([dx, dy])
        
        # Check if agents have converged to the same goal
        distances_to_goals = [np.linalg.norm(goals - pos, axis=1) for pos in current_positions]
        goal_reached_by_agents = [np.argmin(distances) for distances in distances_to_goals]
        distances_to_selected_goal = [np.min(distances) for distances in distances_to_goals]
        
        if (np.array(distances_to_selected_goal)<1).all():
            print(distances_to_selected_goal)
            print(f"Agents have converged to Goal {goal_reached_by_agents[0]} after {iteration + 1} iterations.")
            return current_positions, goal_reached_by_agents[0], iteration

    print("Agents did not converge to the same goal within the maximum iterations.")
    return current_positions, None, iteration

# Run the simulation
final_positions, goal_converged, num_frames = run_simulation()
# Create animation
ani = FuncAnimation(fig, update, frames=range(num_frames), init_func=init, blit=True, repeat=True)

# Save the animation as a video
current_time = datetime.now().strftime("%Y%m%d-%H%M%S")
HTML(ani.to_html5_video()) # Use an interactive backend for animation