In [None]:
%pip install minigrid

In [None]:
from __future__ import annotations

from minigrid.core.constants import COLOR_NAMES
from minigrid.core.grid import Grid
from minigrid.core.mission import MissionSpace
from minigrid.core.world_object import Door, Goal, Key, Wall
from minigrid.manual_control import ManualControl
from minigrid.minigrid_env import MiniGridEnv


class SimpleEnv(MiniGridEnv):
    def __init__(
        self,
        size=10,
        agent_start_pos=(1, 1),
        agent_start_dir=0,
        max_steps: int | None = None,
        **kwargs,
    ):
        self.agent_start_pos = agent_start_pos
        self.agent_start_dir = agent_start_dir

        mission_space = MissionSpace(mission_func=self._gen_mission)

        if max_steps is None:
            max_steps = 4 * size**2

        super().__init__(
            mission_space=mission_space,
            grid_size=size,
            # Set this to True for maximum speed
            see_through_walls=True,
            max_steps=max_steps,
            **kwargs,
        )

    @staticmethod
    def _gen_mission():
        return "grand mission"

    def _gen_grid(self, width, height):
        # Create an empty grid
        self.grid = Grid(width, height)

        # Generate the surrounding walls
        self.grid.wall_rect(0, 0, width, height)

        # Generate verical separation wall
        for i in range(0, height):
            self.grid.set(5, i, Wall())
        
        # Place the door and key
        self.grid.set(5, 6, Door(COLOR_NAMES[0], is_locked=True))
        self.grid.set(3, 6, Key(COLOR_NAMES[0]))

        # Place a goal square in the bottom-right corner
        self.put_obj(Goal(), width - 2, height - 2)

        # Place the agent
        if self.agent_start_pos is not None:
            self.agent_pos = self.agent_start_pos
            self.agent_dir = self.agent_start_dir
        else:
            self.place_agent()

        self.mission = "grand mission"


def main():
    env = SimpleEnv(render_mode="human")

    # enable manual control for testing
    manual_control = ManualControl(env, seed=42)
    manual_control.start()

In [None]:
env = SimpleEnv(render_mode="rgb_array")

In [None]:
manual_control = ManualControl(env, seed=42)

In [None]:
import matplotlib.pyplot as plt
from minigrid.wrappers import ImgObsWrapper

In [None]:
env = ImgObsWrapper(env)

In [None]:
manual_control = ManualControl(env, seed=42)

In [None]:
manual_control.reset()

In [None]:
manual_control.step(0)

In [None]:
manual_control.start()

# train a ppo agent

In [None]:
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
import torch.nn as nn
import torch

In [None]:
class MinigridFeaturesExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, features_dim: int = 512, normalized_image: bool = False) -> None:
        super().__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 16, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(16, 32, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(32, 64, (2, 2)),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with torch.no_grad():
            n_flatten = self.cnn(torch.as_tensor(observation_space.sample()[None]).float()).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        return self.linear(self.cnn(observations))

In [None]:
import gymnasium as gym

In [None]:
import minigrid
from minigrid.wrappers import ImgObsWrapper
from stable_baselines3 import PPO

policy_kwargs = dict(
    features_extractor_class=MinigridFeaturesExtractor,
    features_extractor_kwargs=dict(features_dim=128),
)

env = gym.make("MiniGrid-Empty-16x16-v0", render_mode="rgb_array")
env = ImgObsWrapper(env)

model = PPO("CnnPolicy", env, policy_kwargs=policy_kwargs, verbose=1)
model.learn(2e5)

# simple mdp from gpt

In [None]:
import numpy as np

class Gridworld:
    def __init__(self, size=5, stochastic=False):
        self.size = size
        self.stochastic = stochastic
        self.states = np.arange(size * size).reshape((size, size))
        self.values = np.zeros((size, size))
        self.policy = np.ones((size, size, 4)) / 4  # Uniform random policy
        self.terminal_states = [(size-1, size-1)]
        self.action_space = ['up', 'down', 'left', 'right']
        
    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0
        
        next_state = list(state)
        if action == 'up':
            next_state[0] = max(0, state[0] - 1)
        elif action == 'down':
            next_state[0] = min(self.size - 1, state[0] + 1)
        elif action == 'left':
            next_state[1] = max(0, state[1] - 1)
        elif action == 'right':
            next_state[1] = min(self.size - 1, state[1] + 1)
        
        if self.stochastic and np.random.rand() < 0.1:
            next_state = [np.random.randint(self.size), np.random.randint(self.size)]
        
        return tuple(next_state), -1


In [None]:
import numpy as np

class MDP:
    def __init__(self):
        self.state_inds = list(range(10))
        self.state_vals = [f"S{i}" for i in self.state_inds]
        self.actions = {i: [f"A{j}" for j in range(3)] for i in self.state_inds}
        self.rewards = [-1 for i in self.state_inds]
        
        self.terminal_states = ["S0"]
        self.state = "S8"
        
        self.gamma = 0.95
        
        self.total_reward = 0
        
    def transition(self, action):
        # formally:
        # takes input state and action
        # produces a s x r grid (all possible next states, all possible rewards)
        # in that grid we have real values
        # sum of those values must be 1.
        # sample to get the next state and reward
        
        
        # Currently the transition dynamics are deterministic
        state = self.state
        state_ind = self.state_vals.index(state)
        # assert action is in A(s)
        if action == "A0":
            next_state_ind = max(state_ind - 1, 0)
        elif action == "A1":
            next_state_ind = state_ind
        elif action == "A2":
            next_state_ind = min(state_ind + 1, len(self.state_inds)-1)
        
        reward = self.rewards[next_state_ind]
        next_state = self.state_vals[next_state_ind]
        
        # iterate:
        self.state = next_state
        self.total_reward += reward
                
        return next_state, reward


# policy

In [None]:
class StochasticPolicy:
    def __init__(self, mdp_instance):
        """
        mdp_instance: MDP
            An instance of the MDP class.
        """
        self.policy = {}
        
        # Iterate over all states in the MDP
        for state in mdp_instance.state_vals:
            # Correcting the key access to use integers for the actions dictionary
            state_index = mdp_instance.state_vals.index(state)
            actions = mdp_instance.actions[state_index]
            num_actions = len(actions)
            
            # Assign equal probabilities to each action for the state
            # action_probabilities = {action: 1.0 / num_actions for action in actions}
            # action_probabilities = {"A0": 0.9, "A1": 0.05, "A2": 0.05}
            action_probabilities = {"A0": 1.0, "A1": 0.0, "A2": 0.0}
            self.policy[state] = action_probabilities
    
    def get_action(self, state, rng=None):
        """
        Samples and returns an action for the given state based on the policy probabilities.
        """
        if rng is None:
            rng = np.random.default_rng()
        
        actions = list(self.policy[state].keys())
        probabilities = list(self.policy[state].values())
        
        return rng.choice(actions, p=probabilities)


# simulate


In [None]:
world = MDP()

policy = StochasticPolicy(world)

step = 0
while world.state not in world.terminal_states:
    step += 1
    state = world.state
    action = policy.get_action(state)
    next_state, reward = world.transition(action)

# value function

In [None]:
class ValueFunctions:
    def __init__(self, mdp_instance, policy_instance):
        self.V = {state: 0.1.*np.random.randn() for state in mdp_instance.state_vals}
        self.mdp = mdp_instance
        self.policy = policy_instance
        
    def policy_evaluation(self, theta=1e-9, max_iterations=100):
        """
        Perform policy evaluation to estimate the value function V(s) for the given policy.
        
        :param theta: float, threshold for determining convergence
        :param max_iterations: int, maximum number of iterations to prevent infinite loops
        :return: None, updates the V attribute in place
        """
        for iteration in range(max_iterations):
            delta = 0  # To check for convergence
            for state in self.V:
                if state in self.mdp.terminal_states:
                    continue  # Skip update for terminal states
                v = self.V[state]
                
                # Save current state
                current_state = self.mdp.state
                
                # Set MDP state to current state for evaluation
                self.mdp.state = state
                
                # Get action probabilities from policy
                action_probs = self.policy.policy.get(state, {})
                
                # Update V(s) based on expected return
                new_v = 0
                for action, action_prob in action_probs.items():
                    next_state, reward = self.mdp.transition(action)
                    
                    # Update value function
                    new_v += action_prob * (reward + self.mdp.gamma * self.V[next_state])
                    
                    # Revert MDP state for next calculations
                    self.mdp.state = state
                
                # Update V(s) and track maximum change for convergence check
                self.V[state] = new_v
                delta = max(delta, abs(v - new_v))
                
                # Restore the original state
                self.mdp.state = current_state
            
            # Check for convergence
            if delta < theta:
                print(f'Policy Evaluation converged in {iteration + 1} iterations.')
                break
        else:
            print('Policy Evaluation reached maximum iterations without convergence.')
            


In [None]:
# Initialize value functions
value_functions = ValueFunctions(world, policy)

# Perform policy evaluation
value_functions.policy_evaluation()

# Show resulting value function
value_functions.V


In [None]:
# Initialize value functions
value_functions = ValueFunctions(world, policy)

# Perform policy evaluation
value_functions.policy_evaluation()

# Show resulting value function
value_functions.V


In [None]:
value_functions.V

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.plot(value_functions.V.values())

# as gym environment

In [4]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

class CustomMDP(gym.Env):
    def __init__(self):
        super().__init__()
        self.action_space = spaces.Discrete(3)
        self.observation_space = spaces.Discrete(10)
        self.state = 8
        self.terminal_states = [0]
        self.gamma = 1.0
        self.total_reward = 0

    def step(self, action):
        if self.state in self.terminal_states:
            return self.state, 0, True, {}  # Terminal state, no reward, episode is done
        
        if action == 0:  # "A0"
            next_state = max(self.state - 1, 0)
        elif action == 1:  # "A1"
            next_state = self.state
        elif action == 2:  # "A2"
            next_state = min(self.state + 1, 9)
        
        reward = -1
        self.state = next_state
        self.total_reward += reward
        done = self.state in self.terminal_states
        return self.state, reward, done, {}

    def reset(self, seed=None):
        self.state = 8
        self.total_reward = 0
        return self.state

    def render(self, mode='human'):
        print(f"State: {self.state}, Total Reward: {self.total_reward}")

env = CustomMDP()


In [None]:
??env

In [5]:
env.reset()
done = False
while not done:
    action = env.action_space.sample()
    state, reward, done, _ = env.step(action)
    env.render()



State: 8, Total Reward: -1
State: 9, Total Reward: -2
State: 8, Total Reward: -3
State: 8, Total Reward: -4
State: 7, Total Reward: -5
State: 6, Total Reward: -6
State: 6, Total Reward: -7
State: 7, Total Reward: -8
State: 8, Total Reward: -9
State: 8, Total Reward: -10
State: 8, Total Reward: -11
State: 9, Total Reward: -12
State: 9, Total Reward: -13
State: 9, Total Reward: -14
State: 9, Total Reward: -15
State: 9, Total Reward: -16
State: 9, Total Reward: -17
State: 9, Total Reward: -18
State: 9, Total Reward: -19
State: 8, Total Reward: -20
State: 8, Total Reward: -21
State: 7, Total Reward: -22
State: 7, Total Reward: -23
State: 7, Total Reward: -24
State: 8, Total Reward: -25
State: 9, Total Reward: -26
State: 9, Total Reward: -27
State: 9, Total Reward: -28
State: 9, Total Reward: -29
State: 8, Total Reward: -30
State: 9, Total Reward: -31
State: 9, Total Reward: -32
State: 8, Total Reward: -33
State: 9, Total Reward: -34
State: 9, Total Reward: -35
State: 9, Total Reward: -36
S

In [None]:
import stable_baselines3 as sb3

# Define the agent (policy)
agent = sb3.PPO("MlpPolicy", env, verbose=1)

# Train the agent
agent.learn(total_timesteps=10000)


In [None]:
obs = env.reset()
done = False
while not done:
    action, _ = agent.predict(obs, deterministic=True)
    obs, reward, done, _ = env.step(action)
    env.render()


In [None]:
env.observation_space.n

In [None]:
agent.policy.evaluate_actions(torch.tensor(5).float().unsqueeze(0), torch.tensor(0).float().unsqueeze(0))




In [None]:
import torch

value_estimates = []

for state in range(env.observation_space.n):
    obs_tensor = torch.tensor([state]).float().unsqueeze(0)
    # You can pass dummy actions since they are not used for value estimation
    actions = torch.tensor([0]).float().unsqueeze(0)
    value, _, _ = agent.policy.evaluate_actions(obs_tensor, actions)
    value_estimates.append(value.item())

# Print the value function estimates
for state, value in enumerate(value_estimates):
    print(f"V(S{state}) = {value:.4f}")


In [24]:
obs.n

10

In [9]:
obs = spaces.Discrete(10)

In [23]:
obs[]

TypeError: 'Discrete' object is not callable