In [None]:
!pip install inferactively-pymdp

In [None]:
import os
import sys
import pathlib
import numpy as np
from pymdp.agent import Agent
from pymdp import utils, maths
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.cm as cm
import random
import time

#Generate 8 random tuples
def generate_unique_tuple(exclude_set):
    while True:
        x = random.randint(0, 5)
        y = random.randint(0, 5)
        new_tuple = (x, y)
        if new_tuple not in exclude_set:
            return new_tuple

results = []
for loops_iter in range(0,10):
  seen_tuples = set()
  seen_tuples.add((0,0))
  seen_tuples.add((2,0))
  seen_tuples.add((1,1))
  seen_tuples.add((2,2))
  seen_tuples.add((1,0))
  seen_tuples.add((0,1))


  tuple1 = generate_unique_tuple(seen_tuples)
  seen_tuples.add(tuple1)
  tuple2 = generate_unique_tuple(seen_tuples)
  seen_tuples.add(tuple2)
  tuple3 = generate_unique_tuple(seen_tuples)
  seen_tuples.add(tuple3)
  tuple4 = generate_unique_tuple(seen_tuples)
  seen_tuples.add(tuple4)
  tuple5 = generate_unique_tuple(seen_tuples)
  seen_tuples.add(tuple5)
  tuple6 = generate_unique_tuple(seen_tuples)
  seen_tuples.add(tuple6)
  tuple7 = generate_unique_tuple(seen_tuples)
  seen_tuples.add(tuple7)
  tuple8 = generate_unique_tuple(seen_tuples)


  #Define the world
  world_size = [6, 6]
  num_grid_points = np.prod(world_size) # total number of grid locations (rows X columns)

  # lookup table to convert location (x,y) tuples to index
  grid = np.arange(num_grid_points).reshape(world_size)
  it = np.nditer(grid, flags=["multi_index"])

  grid_location_lookup = []
  while not it.finished:
      grid_location_lookup.append(it.multi_index)
      it.iternext()


  # Hidden Random Variables cheatsheat_locations and destination_locations, and names corresponding to their realizations
  cheatsheat_names = ['CS1', 'CS2', 'CS3', 'CS4']
  cheatsheat_locations = [tuple5, tuple6, tuple7, tuple8]

  destination_names = ["A", "B", "C"]
  destination_locations = [tuple1, tuple2, tuple3]

  hidden_state_combinations = [num_grid_points, len(cheatsheat_locations), len(destination_names)]


  #Observable Random Variables
  cheatsheat_hint_location = (2, 0)
  cheatsheat_hints = ['Nothing'] + cheatsheat_names
  destination_hints = ['Nothing', 'A', 'B', 'C']
  destination_reward_names = ['Nothing', 'Cheese', 'Tom'] #reward could be nothing, positive or negative

  observable_state_combinations = [num_grid_points, len(cheatsheat_hints), len(destination_hints), len(destination_reward_names)]




  likelihood_matrices = utils.obj_array_zeros([ [o_dim] + hidden_state_combinations for o_dim in observable_state_combinations])

  # likelihoods of location obervation depends only on hidden location
  # Hence likelihoods would be 1 given the hidden state's current_location being same as observed_location
  likelihood_matrices[0] = np.tile(np.expand_dims(np.eye(num_grid_points), (-2, -1)), (1, 1, hidden_state_combinations[1], hidden_state_combinations[2]))

  #likelihoods for cheatsheat_hints observation. 'Nothing' is the most likely observation eevrywhere except spots where cheat sheat hint is located
  #likelihood of each realization tied to following hidden variables: current_location and cheatsheat_locations
  likelihood_matrices[1][0,:,:,:] = 1.0
  for i in range(0, len(cheatsheat_locations)):
      likelihood_matrices[1][0, grid_location_lookup.index(cheatsheat_hint_location),i,:] = 0.0
      likelihood_matrices[1][i+1, grid_location_lookup.index(cheatsheat_hint_location),i,:] = 1.0 # full likelihood for realization corresponding to cheatsheat_location


  #likelihoods for cheatsheat_hints observation. 'Nothing' is the most likely observation everywhere except spots where cheat sheat is located
  #likelihood of each realization tied to following hidden variables: current_location and destination_locations
  likelihood_matrices[2][0,:,:,:] = 1.0
  for i, cheatsheat_loc in enumerate(cheatsheat_locations):
    likelihood_matrices[2][0,grid_location_lookup.index(cheatsheat_loc),i,:] = 0.0  # zero likelihood for 'Nothing' realization
    for j in range(0, len(destination_names)):
      likelihood_matrices[2][j+1,grid_location_lookup.index(cheatsheat_loc),i,j] = 1.0 # full likelihood for realization corresponding to destination_location

  #Reward Likelihoods
  likelihood_matrices[3][0,:,:,:] = 1.0
  i = 0
  for destination_loc in destination_locations:
    loc_index = grid_location_lookup.index(destination_loc)
    likelihood_matrices[3][0,loc_index,:,:] = 0.0
    for j in range(0,len(destination_names)):
      if j == i:
        likelihood_matrices[3][1,loc_index,:,j] = 1.0
      else:
        likelihood_matrices[3][2,loc_index,:,j] = 1.0
    i +=1


  # Prior probability distribution for each hidden random variable will be uniform, except for grid location
  initial_prior_distribution = utils.obj_array_uniform(hidden_state_combinations)
  initial_prior_distribution[0] = utils.onehot(grid_location_lookup.index((0,0)), num_grid_points)

  # agent's preference for all states would be zero except for the state corresponding to cheese reward. For state corresponding to tom reward, there's negative preference
  state_preferences = utils.obj_array_zeros(observable_state_combinations)
  state_preferences[3][1] = 2.0 # make the agent want to encounter the "Cheese" observation level
  state_preferences[3][2] = -4.0 # make the agent not want to encounter the "Tom" observation level


  #B would be the transition probabilities matrix
  num_controls = [5, 1, 1]
  B_f_shapes = [ [ns, ns, num_controls[f]] for f, ns in enumerate(hidden_state_combinations)]
  B = utils.obj_array_zeros(B_f_shapes)
  actions = ["UP", "DOWN", "LEFT", "RIGHT", "STAY"]
  for action_id, action_label in enumerate(actions):

    for curr_state, grid_location in enumerate(grid_location_lookup):

      y, x = grid_location

      if action_label == "UP":
        next_y = y - 1 if y > 0 else y
        next_x = x
      elif action_label == "DOWN":
        next_y = y + 1 if y < (world_size[0]-1) else y
        next_x = x
      elif action_label == "LEFT":
        next_x = x - 1 if x > 0 else x
        next_y = y
      elif action_label == "RIGHT":
        next_x = x + 1 if x < (world_size[1]-1) else x
        next_y = y
      elif action_label == "STAY":
        next_x = x
        next_y = y

      new_location = (next_y, next_x)
      next_state = grid_location_lookup.index(new_location)
      B[0][next_state, curr_state, action_id] = 1.0
  B[1][:,:,0] = np.eye(hidden_state_combinations[1])
  B[2][:,:,0] = np.eye(hidden_state_combinations[2])



  #This class defines the environment in which the agent will operate.
  #Step method is called when the agent performs an action. The method modifies the observable state which the agent would see at next time step
  #Reset method just resets the observations to what they were at time step 0
  class Environment():
      def __init__(self,starting_loc = (0,0), cheatsheat_hint_location = (2, 0), cheatsheat_name = 'CS1', destination_name = 'A'):

          self.starting_location = starting_loc
          self.current_location = self.starting_location

          self.cheatsheat_hint_location = cheatsheat_hint_location
          self.cheatsheat_name = cheatsheat_name
          self.cheatsheat_location = cheatsheat_locations[cheatsheat_names.index(self.cheatsheat_name)]
          self.destination_name = destination_name
          print(f'Starting location is {self.starting_location}, Reward condition is {self.destination_name}, cue is located in {self.cheatsheat_name}')

      def step(self,action_label):

          (Y, X) = self.current_location

          #Update the coordinates
          if action_label == "UP":

            Y_new = Y - 1 if Y > 0 else Y
            X_new = X

          elif action_label == "DOWN":

            Y_new = Y + 1 if Y < (world_size[0]-1) else Y
            X_new = X

          elif action_label == "LEFT":
            Y_new = Y
            X_new = X - 1 if X > 0 else X

          elif action_label == "RIGHT":
            Y_new = Y
            X_new = X +1 if X < (world_size[1]-1) else X

          elif action_label == "STAY":
            Y_new, X_new = Y, X

          self.current_location = (Y_new, X_new) # store the new grid location


          #Update the observations
          location_observation = self.current_location

          #reveal the hint of cheat sheat location when jerry reaches cheatsheat_hint_location
          if self.current_location == self.cheatsheat_hint_location:
            cheatsheat_hint_observation = self.cheatsheat_name
          else:
            cheatsheat_hint_observation = 'Nothing'

          if self.current_location == self.cheatsheat_location:
            destination_hint_observation = destination_hints[destination_names.index(self.destination_name)+1]
          else:
            destination_hint_observation = 'Nothing'

          #reveal the hint of destination location when jerry reaches cheatsheat_location
          if self.current_location == destination_locations[0]:
            if self.destination_name == 'A':
              destination_reward_name_observation = 'Cheese'
            else:
              destination_reward_name_observation = 'Tom'
          elif self.current_location == destination_locations[1]:
            if self.destination_name == 'B':
              destination_reward_name_observation = 'Cheese'
            else:
              destination_reward_name_observation = 'Tom'
          elif self.current_location == destination_locations[2]:
            if self.destination_name == 'C':
              destination_reward_name_observation = 'Cheese'
            else:
              destination_reward_name_observation = 'Tom'
          else:
            destination_reward_name_observation = 'Nothing'

          return location_observation, cheatsheat_hint_observation, destination_hint_observation, destination_reward_name_observation

      def reset(self):
          self.current_location = self.starting_location
          print(f'Re-initialized location to {self.starting_location}')
          location_observation = self.current_location
          cheatsheat_hint_observation = 'Nothing'
          destination_hint_observation = 'Nothing'
          destination_reward_name_observation = 'Nothing'

          return location_observation, cheatsheat_hint_observation, destination_hint_observation, destination_reward_name_observation



  # I finally instantiate the agent and the environment. Agent takes in its constructor the likelihood_matrices, state_preferences, initial_prior_distribution, and action transition matrix B
  fristonian_jerry = Agent(A = likelihood_matrices, B = B, C = state_preferences, D = initial_prior_distribution, policy_len = 4)

  #I specify cheatsheat hint location, cheatsheat name and destination name
  env = Environment(starting_loc = (0,0), cheatsheat_hint_location = (2, 0), cheatsheat_name = 'CS4', destination_name = 'B')

  location_observation, cheatsheat_hint_observation, destination_hint_observation, destination_reward_name_observation = env.reset()


  history_of_locs = [location_observation]
  obs = [grid_location_lookup.index(location_observation), cheatsheat_hints.index(cheatsheat_hint_observation), destination_hints.index(destination_hint_observation), destination_reward_names.index(destination_reward_name_observation)]

  #Active inference loop
  T = 12 # number of total timesteps
  start_time = time.time()
  for t in range(T):

      qs = fristonian_jerry.infer_states(obs)

      fristonian_jerry.infer_policies()
      chosen_action_id = fristonian_jerry.sample_action()

      movement_id = int(chosen_action_id[0])

      choice_action = actions[movement_id]

      location_observation, cheatsheat_hint_observation, destination_hint_observation, destination_reward_name_observation = env.step(choice_action)

      obs = [grid_location_lookup.index(location_observation), cheatsheat_hints.index(cheatsheat_hint_observation), destination_hints.index(destination_hint_observation), destination_reward_names.index(destination_reward_name_observation)]

      history_of_locs.append(location_observation)


      if(destination_reward_name_observation == "Cheese"):
        break
      elif destination_reward_name_observation == "Tom":
        break

  results.append([t, destination_reward_name_observation, time.time()-start_time])
  print(results)



In [None]:
times = []
steps = []
rewards = []

cheeses = 0
cheese_times_total = 0
cheese_steps_total = 0

for r in results:
  times.append(r[2])
  steps.append(r[0])
  rewards.append(r[1])
  if r[1] == 'Cheese':
    cheeses += 1
    cheese_times_total += r[2]
    cheese_steps_total += r[0]

print(cheeses/10)
print(sum(times) / 10)
print(cheese_steps_total / cheeses)

import pandas as pd
results_df = pd.DataFrame({"Reward": rewards, "Time (s)": times, "Steps": steps})
results_df.to_excel("results.xlsx")

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

ax1.plot(range(1, 11), times, marker='o', linestyle='-', color='orange')
ax1.set_xlabel('Iteration')
ax1.set_ylabel('Time Taken')
ax1.set_title('Time Taken (s) during each iteration')
ax1.grid(True)

ax2.plot(range(1, 11), steps, marker='o', linestyle='-', color='blue')
ax2.set_xlabel('Iteration')
ax2.set_ylabel('Steps Taken')
ax2.set_title('Steps Taken during each iteration')
ax2.grid(True)

plt.tight_layout()

plt.show()

In [None]:
#Visualization using pygame
# set loops_iter range to 1 before running

import pygame
import numpy as np

destination_location = destination_locations[destination_names.index('B')]
cheatsheat_location = cheatsheat_locations[cheatsheat_names.index('CS4')]

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GRAY = (200, 200, 200)

pygame.init()

window_size = (800, 600)
screen = pygame.Surface(window_size)
screen.fill(WHITE)


cell_width = window_size[0] // (world_size[0])
cell_height = window_size[1] // (world_size[1])
for row in range(world_size[0]):
    for col in range(world_size[1]):
        pygame.draw.rect(screen, GRAY, (col * cell_width, row * cell_height, cell_width, cell_height), 1)

pygame.draw.rect(screen, GRAY, (0 * cell_width, 2 * cell_height, cell_width, cell_height))

tom_img = pygame.image.load("/content/Tom.webp")
tom_img = pygame.transform.scale(tom_img, (cell_width, cell_height))

jerry_img = pygame.image.load("/content/jerry.png")
jerry_img = pygame.transform.scale(jerry_img, (cell_width, cell_height))

cheatsheat_img = pygame.image.load("/content/mapicon.jpeg")
cheatsheat_img = pygame.transform.scale(cheatsheat_img, (cell_width, cell_height))

fake_cheatsheat_img = pygame.image.load("/content/mapicon_blurred.png")
fake_cheatsheat_img = pygame.transform.scale(fake_cheatsheat_img, (cell_width, cell_height))

cheese_img = pygame.image.load("/content/cheese.webp")
cheese_img = pygame.transform.scale(cheese_img, (cell_width, cell_height))

screen.blit(jerry_img, (0 * cell_width, 0 * cell_height))

for d in destination_locations:
  if d == destination_location:
    screen.blit(cheese_img, (d[1] * cell_width, d[0] * cell_height))
  else:
    screen.blit(tom_img, (d[1] * cell_width, d[0] * cell_height))

for c in cheatsheat_locations:
  if c == cheatsheat_location:
    screen.blit(cheatsheat_img, (c[1] * cell_width, c[0] * cell_height))
  else:
    screen.blit(fake_cheatsheat_img, (c[1] * cell_width, c[0] * cell_height))

# jerry's path
all_locations = np.vstack(history_of_locs).astype(float)
if len(all_locations) > 1:
    pygame.draw.lines(screen, RED, False, [(int(loc[1]) * cell_width + cell_width // 2, int(loc[0]) * cell_height + cell_height // 2) for loc in all_locations], 2)


image_data = pygame.surfarray.array3d(screen)
image_data = np.moveaxis(image_data, 0, 1)
pygame.image.save(screen, 'grid_visualization.png')

pygame.quit()
