In [1]:
# __future__ import should always be first
from __future__ import annotations

# Standard library imports
from collections import defaultdict

# Third-party imports
import torch
import numpy as np
import torch.nn as nn
from torch.distributions.categorical import Categorical

# Gymnasium & Minigrid imports
import gymnasium as gym  # Correct way to import Gymnasium
from minigrid.core.constants import COLOR_NAMES
from minigrid.core.constants import DIR_TO_VEC
from minigrid.core.grid import Grid
from minigrid.core.actions import Actions
from minigrid.core.mission import MissionSpace
from minigrid.core.world_object import Door, Goal, Key, Wall
from minigrid.manual_control import ManualControl
from minigrid.minigrid_env import MiniGridEnv
from gymnasium.utils.play import play
import pandas as pd
# Visualization imports
import matplotlib.pyplot as plt


In [2]:

class SimpleEnv(MiniGridEnv):
    def __init__(
            self, 
            size=10, 
            agent_start_pos=(1, 8), 
            agent_start_dir=0, 
            max_steps=256, 
            **kwargs,
    ):
        self.agent_start_pos = agent_start_pos
        self.agent_start_dir = agent_start_dir
        self.goal_pos = (8, 1)
        
        
        
        mission_space = MissionSpace(mission_func=self._gen_mission)

        super().__init__(
            mission_space=mission_space,
            grid_size=size,
            max_steps=max_steps,
            **kwargs,
        )

        self.action_space = gym.spaces.Discrete(3)
    @staticmethod
    def _gen_mission():
        return "Find the shortest path"

    def _gen_grid(self, width, height):
        #create gird
        self.grid = Grid(width, height)
        #place barrier
        self.grid.wall_rect(0, 0, width, height)
        #place goal
        self.put_obj(Goal(), 8, 1)
        #place walls
        for i in range(1, width // 2):
            self.grid.set(i, width - 4, Wall())
            self.grid.set(i + width // 2 - 1, width - 7, Wall())
        #place agent
        if self.agent_start_pos is not None:
            self.agent_pos = self.agent_start_pos #check this
            self.agent_dir = self.agent_start_dir
        else:
            self.place_agent()

        self.mission = "find the shortest path"
    
    def count_states(self):
        free_cells = sum(1 for x in range(self.grid.width)
                      for y in range(self.grid.height)
                      if not self.grid.get(x, y)) * 4
        return free_cells 


In [3]:
class Qlearning:
    def __init__(
        self,
        env,
        learning_rate = 0.9,
        discount_factor = 0.9,
        epsilon = 0.5,
        epochs = 1000
         
    ):
        self.env = env # MiniGrid environment called from the class Minigrid
        self.num_states = np.int64(env.count_states())
        self.num_actions = env.action_space.n
        self.Q_table = np.zeros((((env.width -2) * (env.height -2) *4), self.num_actions)) # Q table has goal states in it as well
        # print(f"Q table shape: {self.Q_table.shape}")
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor # discount factor
        self.epsilon =  epsilon # exploration probability
        self.epochs = epochs


        self.allowed_state_idx = self.find_state_indexes(env)
        # print(f"allowed state indexes: {self.allowed_state_idx}")


    
    def position_to_state_index(self, tuple_position = None): 
        grid_width = self.env.grid.width -2
        if tuple_position is None:
            direction = self.env.agent_dir
            x, y = self.env.agent_pos

        else:
            if not isinstance(tuple_position, tuple) or len(tuple_position) != 3:
                raise ValueError(f"Invalid position format: {tuple_position}")
            x, y, direction = tuple_position
        return np.int64(((y-1) * grid_width + (x-1)) * 4 + direction) 
    
    def state_index_to_position(self, state_idx):
        """Converts a scalar state index back into (x, y, direction)."""
        grid_width = self.env.grid.width-2
        
        direction = state_idx % 4
        linear_idx = state_idx // 4

        y, x = divmod(linear_idx, grid_width)  # Convert to (x, y)
        
        return x+1, y+1, direction
    
    def find_state_indexes(self, env):
        """Counts all states except walls and barriers"""
        state_indexes_list = []
        for x in range(1, env.grid.width-1):
            for y in range(1, env.grid.height-1):
                if env.grid.get(x, y) is None: #grabs all empty spaces
                    for direction in range(4):
                        state_index = self.position_to_state_index((x, y, direction))
                        state_indexes_list.append(state_index)
        return state_indexes_list        
    
    
    def train(self, epochs):
        goal_states = [self.position_to_state_index((8, 1, d)) for d in range(4)] #goal state index
        
        for epoch in range(epochs):
            current_state = np.random.choice(self.allowed_state_idx)
            x,y,dir = self.state_index_to_position(current_state)
            self.env.agent_pos = (x,y)
            self.env.agent_dir = dir
            print(f"Epoch {epoch}: Starting state: {current_state}")
            print(f"Epoch {epoch}: Starting position: {self.state_index_to_position(current_state)}")

            while current_state not in goal_states:
                # #replace current index with its table index
                # table_index = self.state_to_index[current_state]
                # Epsilon-greedy action selection
                if np.random.rand() < self.epsilon:
                    action = self.env.action_space.sample()
                    print(f"random action: {action}, exploring")
                else:
                    action = np.argmax(self.Q_table[current_state])
                    print(f"greedy action: {action}, exploiting")


                #transition to the next state
                next_obs, _, done, _, _ = self.env.step(action)

                # print(f"next_observation: {next_obs}")
                next_state = self.position_to_state_index()
                print(f"next_state: {next_state}")
                print(f"next  step: Agent Pos: {self.env.agent_pos}, Dir: {self.env.agent_dir}")
                #reward (-1 for each step thats not the gaol)
                reward = 0 if next_state in goal_states else -1

                self.Q_table[current_state, action] += self.learning_rate * (reward + self.discount_factor * np.max(self.Q_table[next_state]) - self.Q_table[current_state, action])
                #update state
                if next_state in goal_states:
                    break
                # assert(False)
                current_state = next_state
        
        
    

    def run_policy_2(self):
        """Run the environment using the learned policy from a Q-table."""

        self.env.reset()[0]  # Reset environment
        current_state = self.position_to_state_index()  # Convert starting position to index
        done = False
        step_count = 0  # Track steps to prevent infinite loops

        while not done and step_count < 100:  # Prevent infinite loops
            # Debugging: Print current state and Q-values
            print(f"Step {step_count}: State {current_state}")
            print(f"Q-values: {self.Q_table[current_state]}")
            
            action = np.argmax(self.Q_table[current_state])  # Choose best action
            print(f"Chosen action: {action}")

            # Before taking the step, print agent's current position
            print(f"Before step: Agent Pos: {self.env.agent_pos}, Dir: {self.env.agent_dir}")

            next_obs, _, done, _, _ = self.env.step(action)  # Take action

            # After step, print agent's new position
            print(f"After step: Agent Pos: {self.env.agent_pos}, Dir: {self.env.agent_dir}")

            next_state = self.position_to_state_index()  # Convert new state

            # Detect if the agent is looping in the same state
            if next_state == current_state:
                print(f"⚠️ Warning: Agent is stuck! Current state {current_state} is the same as next state {next_state}.")
                break  # Prevent infinite loop

            self.env.render()  # Visualize movement
            current_state = next_state  # Update current state
            step_count += 1





Q Learning With Greedy Policy Normlization

In [None]:
class Qlearning:
    def __init__(
        self,
        env,
        learning_rate = 0.9,
        discount_factor = 0.9,
        epsilon = 0.5,
        epochs = 1000
         
    ):
        self.env = env # MiniGrid environment called from the class Minigrid
        self.num_states = np.int64(env.count_states())
        self.num_actions = env.action_space.n
        self.Q_table = np.zeros((((env.width -2) * (env.height -2) *4), self.num_actions)) # Q table has goal states in it as well
        # print(f"Q table shape: {self.Q_table.shape}")
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor # discount factor
        self.epsilon =  epsilon # exploration probability
        self.epochs = epochs


        self.allowed_state_idx = self.find_state_indexes(env)
        # print(f"allowed state indexes: {self.allowed_state_idx}")


    
    def position_to_state_index(self, tuple_position = None): 
        grid_width = self.env.grid.width -2
        if tuple_position is None:
            direction = self.env.agent_dir
            x, y = self.env.agent_pos

        else:
            if not isinstance(tuple_position, tuple) or len(tuple_position) != 3:
                raise ValueError(f"Invalid position format: {tuple_position}")
            x, y, direction = tuple_position
        return np.int64(((y-1) * grid_width + (x-1)) * 4 + direction) 
    
    def state_index_to_position(self, state_idx):
        """Converts a scalar state index back into (x, y, direction)."""
        grid_width = self.env.grid.width-2
        
        direction = state_idx % 4
        linear_idx = state_idx // 4

        y, x = divmod(linear_idx, grid_width)  # Convert to (x, y)
        
        return x+1, y+1, direction
    
    def find_state_indexes(self, env):
        """Counts all states except walls and barriers"""
        state_indexes_list = []
        for x in range(1, env.grid.width-1):
            for y in range(1, env.grid.height-1):
                if env.grid.get(x, y) is None: #grabs all empty spaces
                    for direction in range(4):
                        state_index = self.position_to_state_index((x, y, direction))
                        state_indexes_list.append(state_index)
        return state_indexes_list        
    
    
    def train(self, epochs):
        goal_states = [self.position_to_state_index((8, 1, d)) for d in range(4)] #goal state index
        
        for epoch in range(epochs):
            current_state = np.random.choice(self.allowed_state_idx)
            x,y,dir = self.state_index_to_position(current_state)
            self.env.agent_pos = (x,y)
            self.env.agent_dir = dir
            print(f"Epoch {epoch}: Starting state: {current_state}")
            print(f"Epoch {epoch}: Starting position: {self.state_index_to_position(current_state)}")

            while current_state not in goal_states:
                # #replace current index with its table index
                # table_index = self.state_to_index[current_state]
                # Epsilon-greedy action selection
                if np.random.rand() < self.epsilon:
                    action = self.env.action_space.sample()
                    print(f"random action: {action}, exploring")
                else:
                    action = np.argmax(self.Q_table[current_state])
                    print(f"greedy action: {action}, exploiting")


                #transition to the next state
                next_obs, _, done, _, _ = self.env.step(action)

                # print(f"next_observation: {next_obs}")
                next_state = self.position_to_state_index()
                print(f"next_state: {next_state}")
                print(f"next  step: Agent Pos: {self.env.agent_pos}, Dir: {self.env.agent_dir}")
                #reward (-1 for each step thats not the gaol)
                reward = 0 if next_state in goal_states else -1

                self.Q_table[current_state, action] += self.learning_rate * (reward + self.discount_factor * np.max(self.Q_table[next_state]) - self.Q_table[current_state, action])
                #update state
                if next_state in goal_states:
                    break
                # assert(False)
                current_state = next_state
        
        
    

    def run_policy_2(self):
        """Run the environment using the learned policy from a Q-table."""

        self.env.reset()[0]  # Reset environment
        current_state = self.position_to_state_index()  # Convert starting position to index
        done = False
        step_count = 0  # Track steps to prevent infinite loops

        while not done and step_count < 100:  # Prevent infinite loops
            # Debugging: Print current state and Q-values
            print(f"Step {step_count}: State {current_state}")
            print(f"Q-values: {self.Q_table[current_state]}")
            
            action = np.argmax(self.Q_table[current_state])  # Choose best action
            print(f"Chosen action: {action}")

            # Before taking the step, print agent's current position
            print(f"Before step: Agent Pos: {self.env.agent_pos}, Dir: {self.env.agent_dir}")

            next_obs, _, done, _, _ = self.env.step(action)  # Take action

            # After step, print agent's new position
            print(f"After step: Agent Pos: {self.env.agent_pos}, Dir: {self.env.agent_dir}")

            next_state = self.position_to_state_index()  # Convert new state

            # Detect if the agent is looping in the same state
            if next_state == current_state:
                print(f"⚠️ Warning: Agent is stuck! Current state {current_state} is the same as next state {next_state}.")
                break  # Prevent infinite loop

            self.env.render()  # Visualize movement
            current_state = next_state  # Update current state
            step_count += 1





In [6]:
env = SimpleEnv(render_mode=None) 
env.reset()
q_training = Qlearning(env)
q_training.train(500)


Epoch 0: Starting state: 99
Epoch 0: Starting position: (np.int64(1), np.int64(4), np.int64(3))
greedy action: 0, exploiting
next_state: 98
next  step: Agent Pos: (np.int64(1), np.int64(4)), Dir: 2
greedy action: 0, exploiting
next_state: 97
next  step: Agent Pos: (np.int64(1), np.int64(4)), Dir: 1
random action: 0, exploring
next_state: 96
next  step: Agent Pos: (np.int64(1), np.int64(4)), Dir: 0
greedy action: 0, exploiting
next_state: 99
next  step: Agent Pos: (np.int64(1), np.int64(4)), Dir: 3
greedy action: 1, exploiting
next_state: 96
next  step: Agent Pos: (np.int64(1), np.int64(4)), Dir: 0
greedy action: 1, exploiting
next_state: 97
next  step: Agent Pos: (np.int64(1), np.int64(4)), Dir: 1
greedy action: 1, exploiting
next_state: 98
next  step: Agent Pos: (np.int64(1), np.int64(4)), Dir: 2
greedy action: 1, exploiting
next_state: 99
next  step: Agent Pos: (np.int64(1), np.int64(4)), Dir: 3
random action: 1, exploring
next_state: 96
next  step: Agent Pos: (np.int64(1), np.int64(

In [7]:
env_human = SimpleEnv(render_mode="human") #make same env but in human mode 

q_training.env= env_human #switch out the env in q training with human env
q_training.run_policy_2() #run the policy in the human env

Step 0: State 224
Q-values: [-8.71796514 -8.88759108 -8.64914714]
Chosen action: 2
Before step: Agent Pos: (1, 8), Dir: 0
After step: Agent Pos: (np.int64(2), np.int64(8)), Dir: 0
Step 1: State 228
Q-values: [-8.74493888 -8.77642652 -8.49905365]
Chosen action: 2
Before step: Agent Pos: (np.int64(2), np.int64(8)), Dir: 0
After step: Agent Pos: (np.int64(3), np.int64(8)), Dir: 0
Step 2: State 232
Q-values: [-8.6491069  -8.64914774 -8.33228183]
Chosen action: 2
Before step: Agent Pos: (np.int64(3), np.int64(8)), Dir: 0
After step: Agent Pos: (np.int64(4), np.int64(8)), Dir: 0
Step 3: State 236
Q-values: [-8.49765687 -8.49905364 -8.14697981]
Chosen action: 2
Before step: Agent Pos: (np.int64(4), np.int64(8)), Dir: 0
After step: Agent Pos: (np.int64(5), np.int64(8)), Dir: 0
Step 4: State 240
Q-values: [-7.94108868 -8.33228183 -8.33228123]
Chosen action: 0
Before step: Agent Pos: (np.int64(5), np.int64(8)), Dir: 0
After step: Agent Pos: (np.int64(5), np.int64(8)), Dir: 3
Step 5: State 243
Q-