# Settings

## Import

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator, AutoMinorLocator
from mpl_toolkits.axes_grid1 import make_axes_locatable
from IPython import display
import time
import json
from copy import deepcopy

## Support

In [None]:
plt.rc('font', size=30)  # controls default text sizes
plt.rc('axes', titlesize=25)  # fontsize of the axes title
plt.rc('axes', labelsize=25)  # fontsize of the x and y labels
plt.rc('xtick', labelsize=17)  # fontsize of the tick labels
plt.rc('ytick', labelsize=17)  # fontsize of the tick labels
plt.rc('legend', fontsize=20)  # legend fontsize
plt.rc('figure', titlesize=30)
plt.tight_layout()

def plot(V, pi):
    # plot value
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12.5, 5))
    ax1.axis('on')
    ax1.cla()
    states = np.arange(V.shape[0])
    ax1.bar(states, V, edgecolor='none')
    ax1.set_xlabel('State')
    ax1.set_ylabel('Value', rotation='horizontal', ha='right')
    ax1.set_title('Value Function')
    ax1.xaxis.set_major_locator(MaxNLocator(integer=True, nbins=6))
    ax1.yaxis.grid()
    ax1.set_ylim(bottom=V.min())
    # plot policy
    ax2.axis('on')
    ax2.cla()
    im = ax2.imshow(pi.T, cmap='Greys', vmin=0, vmax=1, aspect='auto')
    ax2.invert_yaxis()
    ax2.set_xlabel('State')
    ax2.set_ylabel('Action', rotation='horizontal', ha='right')
    ax2.set_title('Policy')
    start, end = ax2.get_xlim()
    ax2.xaxis.set_ticks(np.arange(start, end), minor=True)
    ax2.xaxis.set_major_locator(MaxNLocator(integer=True, nbins=6))
    ax2.yaxis.set_major_locator(MaxNLocator(integer=True, nbins=6))
    start, end = ax2.get_ylim()
    ax2.yaxis.set_ticks(np.arange(start, end), minor=True)
    ax2.grid(which='minor')
    divider = make_axes_locatable(ax2)
    cax = divider.append_axes('right', size='5%', pad=0.20)
    cbar = fig.colorbar(im, cax=cax, orientation='vertical')
    cbar.set_label('Probability', rotation=0, ha='left')
    fig.subplots_adjust(wspace=0.5)
    display.clear_output(wait=True)
    display.display(fig)
    time.sleep(0.001)
    plt.close()

class Transitions(list):

    def __init__(self, transitions):
        self.__transitions = transitions
        super().__init__(transitions)

    def __repr__(self):
        repr = '{:<14} {:<10} {:<10}'.format('Next State', 'Reward',
                                             'Probability')
        repr += '\n'
        for i, (s, r, p) in enumerate(self.__transitions):
            repr += '{:<14} {:<10} {:<10}'.format(s, round(r, 2), round(p, 2))
            if i != len(self.__transitions) - 1:
                repr += '\n'
        return repr

<Figure size 640x480 with 0 Axes>

# Environments

## GridWorld

In [None]:
class GridWorld:

    def __init__(self, size, initial_value=None):
        self.size = size
        self.states = np.arange(size * size)
        self.actions = np.arange(4)
        self.grid = np.full((size, size), initial_value)

    def transitions(self, s, a):
        return np.array([[r, self.p(s_, r, s, a)]
                         for s_, r in self.support(s, a)])

    def support(self, s, a):
        return [(s_, self.reward(s, s_)) for s_ in self.states]

    def p(self, next_state, reward, state, action):
        row = state // self.size
        col = state % self.size

        if (state == 0 and next_state == 0):
            return 1.0

        if (state == len(self.states) - 1 and next_state == len(self.states) - 1):
            return 1.0

        if (state == 0 or state == len(self.states) - 1):
            return 0.0

        #up = 0
        if (action == 0):
            new_col = col
            new_row = max(row - 1, 0)

        #down = 1
        if (action == 1):
            new_col = col
            new_row = min(row + 1, self.size - 1)

        #left = 2
        if (action == 2):
            new_col = max(col - 1, 0)
            new_row = row

        #right = 3
        if (action == 3):
            new_col = min(col + 1, self.size - 1)
            new_row = row

        new_state = new_row * self.size + new_col
        if (new_state == next_state):
            return 1.0
        else:
            return 0.0

    def reward(self, state, next_state):
        if (state == 0 or state == len(self.states) - 1):
            return 0.0
        else:
            return -1.0

    @property
    def A(self):
        return list(self.actions)

    @property
    def S(self):
        return list(self.states)

    def is_done(self, state):
        return True if state == 0 or state == 15 else False

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

class GridWorldEnv(gym.Env):
    """
    Custom Grid World environment.
    """
    metadata = {'render.modes': ['human']}

    def __init__(self, grid_size=(4, 4), start_pos=(0, 0), goal_pos=(3, 3), obstacles=[(1, 1), (2, 2), (3, 3)]):
        super(GridWorldEnv, self).__init__()

        # Define the action space (up, down, left, right)
        self.action_space = spaces.Discrete(4)

        # Define the observation space (grid size)
        self.observation_space = spaces.Box(low=0, high=grid_size[0]-1, shape=(2,), dtype=np.int64)

        # Set the grid size
        self.grid_size = grid_size

        # Set the start and goal positions
        self.start_pos = start_pos
        self.goal_pos = goal_pos

        # Set the obstacles
        self.obstacles = obstacles

        # Initialize the agent's position
        self.agent_pos = start_pos

        # Initialize the reward
        self.reward = 0

        # Initialize the done flag
        self.done = False

    def step(self, action):
        """
        Perform an action in the environment.

        Args:
            action (int): The action to perform (0: up, 1: down, 2: left, 3: right).

        Returns:
            np.ndarray: The new state (agent's position).
            float: The reward for the action.
            bool: Whether the episode has ended.
            dict: Additional information (empty).
        """
        # Get the current position
        x, y = self.agent_pos

        # Update the position based on the action
        if action == 0:  # up
            new_x, new_y = x, y + 1
        elif action == 1:  # down
            new_x, new_y = x, y - 1
        elif action == 2:  # left
            new_x, new_y = x - 1, y
        else:  # right
            new_x, new_y = x + 1, y

        # Check if the new position is within the grid and not an obstacle
        if 0 <= new_x < self.grid_size[0] and 0 <= new_y < self.grid_size[1] and (new_x, new_y) not in self.obstacles:
            self.agent_pos = (new_x, new_y)


        next_state = self.agent_pos[0] * self.grid_size[0] + self.agent_pos[1]

        # Check if the agent has reached the goal
        if self.agent_pos in self.goal_pos:
            self.reward = 0
            self.done = True
        else:
            self.reward = -1
            self.done = False

        return next_state, self.reward, self.done, {}, {}

    def reset(self, seed=None, options=None):
        """
        Reset the environment to the initial state.

        Returns:
            np.ndarray: The initial state (agent's position).
        """
        self.agent_pos = self.start_pos
        self.reward = 0
        self.done = False

        state = self.agent_pos[0] * self.grid_size[0]  + self.agent_pos[1]

        return state

    def render(self, mode='human'):
        """
        Render the environment.

        Args:
            mode (str): The rendering mode ('human' or 'rgb_array').
        """
        # Create a grid to represent the environment
        grid = np.zeros((self.grid_size[0], self.grid_size[1]), dtype=str)

        # Set the goal position
        for goal in self.goal_pos:
            grid[goal] = 'G'

        # Set the obstacles
        for obstacle in self.obstacles:
            grid[obstacle] = 'X'

        # Set the agent's position
        grid[self.agent_pos] = 'A'

        # Print the grid
        print(grid)

    @property
    def A(self):
        return self.action_space

    @property
    def S(self):
        return self.observation_space

# Dynamic Progamming

## Policy Evaluation

In [None]:
def bellman_update(env, V, pi, s, gamma):
    sum_v = 0.0
    for a in env.A:
        transitions = env.transitions(s, a)
        for next_state in env.S:
            reward = transitions[next_state, 0]
            transition_prob = transitions[next_state, 1]
            sum_v += pi[s][a] * transition_prob * (reward + gamma * V[next_state])
    V[s] = sum_v

def evaluate_policy(env, V, pi, gamma, theta):
    delta = float('inf')
    while delta > theta:
        delta = 0
        for s in env.S:
            v = V[s]
            bellman_update(env, V, pi, s, gamma)
            delta = max(delta, abs(v - V[s]))

    return V

def evaluate_policy2(env, V, pi, gamma, steps):
    for k in range(steps):
        for s in env.S:
            v = V[s]
            bellman_update(env, V, pi, s, gamma)
    return V

## Policy Iteration

In [None]:
def q_greedify_policy(env, V, pi, s, gamma):
    q_pi = np.zeros(len(env.A))
    for a in env.A:
        transitions = env.transitions(s, a)
        for next_state in env.S:
            reward = transitions[next_state, 0]
            transition_prob = transitions[next_state, 1]
            q_pi[a] += transition_prob * (reward + gamma * V[next_state])
    best_action = np.argmax(q_pi)
    pi[s] = np.eye(len(env.A))[best_action]


def improve_policy(env, V, pi, gamma):
    policy_stable = True
    for s in env.S:
        old = pi[s].copy()
        q_greedify_policy(env, V, pi, s, gamma)

        if not np.array_equal(pi[s], old):
            policy_stable = False

    return pi, policy_stable

def policy_iteration(env, gamma, theta):
    V = np.zeros(len(env.S))
    pi = np.ones((len(env.S), len(env.A))) / len(env.A)
    policy_stable = False

    while not policy_stable:
        V = evaluate_policy(env, V, pi, gamma, theta)
        pi, policy_stable = improve_policy(env, V, pi, gamma)

    return V, pi


## Value Iteration

In [None]:
def bellman_optimality_update(env, V, s, gamma):
    v = np.zeros(len(env.A))

    for a in env.A:
        transitions = env.transitions(s, a)
        for next_state in env.S:
            reward = transitions[next_state, 0]
            transition_prob = transitions[next_state, 1]
            v[a] += transition_prob * (reward + gamma * V[next_state])
    V[s] = np.max(v)

def value_iteration(env, gamma, theta):
    V = np.zeros(len(env.S))

    while True:
        delta = 0
        for s in env.S:
            v = V[s]
            bellman_optimality_update(env, V, s, gamma)
            delta = max(delta, abs(v - V[s]))

        if delta < theta:
            break
    pi = np.ones((len(env.S), len(env.A))) / len(env.A)
    for s in env.S:
        q_greedify_policy(env, V, pi, s, gamma)
    return V, pi

def value_iteration2(env, gamma, theta):
    V = np.zeros(len(env.S))
    pi = np.ones((len(env.S), len(env.A))) / len(env.A)
    while True:
        delta = 0
        for s in env.S:
            v = V[s]
            q_greedify_policy(env, V, pi, s, gamma)
            bellman_update(env, V, pi, s, gamma)
            delta = max(delta, abs(v - V[s]))
        if delta < theta:
            break
    return V, pi

# Monte Carlo

In [None]:
def first_visit_mc_prediction(env, policy, num_episodes, gamma=1.0):
    """
    Performs first-visit Monte Carlo prediction to estimate the value function.

    Args:
        env (gym.Env): The environment representing the MDP.
        policy (function): A function that maps states to actions.
        num_episodes (int): The number of episodes to simulate.
        gamma (float): The discount factor.

    Returns:
        dict: A dictionary mapping states to their estimated values.
    """
    # Initialize the value function
    V = {}

    # Initialize the returns for each state
    returns = {}

    for _ in range(num_episodes):
        # Reset the environment
        state = env.reset()

        # Generate an episode using the given policy
        episode = []
        done = False
        while not done:
            action = policy(state)
            next_state, reward, done, _, _ = env.step(action)
            episode.append((state, reward))
            state = next_state

        # Calculate the return for each state in the episode
        G = 0.0
        t = len(episode) - 1
        for state, reward in reversed(episode):
            G = gamma * G + reward
            if state not in [s for s, _ in episode[:t - 1]]:
                if state not in returns:
                    returns[state] = []
                returns[state].append(G)
            t -= 1

    # Calculate the average return for each state

    for state, total_returns in returns.items():
        V[state] = np.mean(total_returns)

    return V

# Temporal Difference


## TD(0)

# Test

## ENV

In [None]:
!pip install gymnasium



In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

class GridWorldEnv(gym.Env):
    """
    Custom Grid World environment.
    """
    metadata = {'render.modes': ['human']}

    def __init__(self, grid_size=(4, 4), start_pos=(0, 0), goal_pos=(3, 3), obstacles=[(1, 1), (2, 2), (3, 3)]):
        super(GridWorldEnv, self).__init__()

        # Define the action space (up, down, left, right)
        self.action_space = spaces.Discrete(4)

        # Define the observation space (grid size)
        self.observation_space = spaces.Box(low=0, high=grid_size[0]-1, shape=(2,), dtype=np.int64)

        # Set the grid size
        self.grid_size = grid_size

        # Set the start and goal positions
        self.start_pos = start_pos
        self.goal_pos = goal_pos

        # Set the obstacles
        self.obstacles = obstacles

        # Initialize the agent's position
        self.agent_pos = start_pos

        # Initialize the reward
        self.reward = 0

        # Initialize the done flag
        self.done = False

    def step(self, action):
        """
        Perform an action in the environment.

        Args:
            action (int): The action to perform (0: up, 1: down, 2: left, 3: right).

        Returns:
            np.ndarray: The new state (agent's position).
            float: The reward for the action.
            bool: Whether the episode has ended.
            dict: Additional information (empty).
        """
        # Get the current position
        x, y = self.agent_pos

        # Update the position based on the action
        if action == 0:  # up
            new_x, new_y = x, y + 1
        elif action == 1:  # down
            new_x, new_y = x, y - 1
        elif action == 2:  # left
            new_x, new_y = x - 1, y
        else:  # right
            new_x, new_y = x + 1, y

        # Check if the new position is within the grid and not an obstacle
        if 0 <= new_x < self.grid_size[0] and 0 <= new_y < self.grid_size[1] and (new_x, new_y) not in self.obstacles:
            self.agent_pos = (new_x, new_y)


        next_state = self.agent_pos[0] * self.grid_size[0] + self.agent_pos[1]

        # Check if the agent has reached the goal
        if self.agent_pos in self.goal_pos:
            self.reward = 0
            self.done = True
        else:
            self.reward = -1
            self.done = False

        return next_state, self.reward, self.done, {}, {}

    def reset(self, seed=None, options=None):
        """
        Reset the environment to the initial state.

        Returns:
            np.ndarray: The initial state (agent's position).
        """
        self.agent_pos = self.start_pos
        self.reward = 0
        self.done = False

        state = self.agent_pos[0] * self.grid_size[0]  + self.agent_pos[1]

        return state

    def render(self, mode='human'):
        """
        Render the environment.

        Args:
            mode (str): The rendering mode ('human' or 'rgb_array').
        """
        # Create a grid to represent the environment
        grid = np.zeros((self.grid_size[0], self.grid_size[1]), dtype=str)

        # Set the goal position
        for goal in self.goal_pos:
            grid[goal] = 'G'

        # Set the obstacles
        for obstacle in self.obstacles:
            grid[obstacle] = 'X'

        # Set the agent's position
        grid[self.agent_pos] = 'A'

        # Print the grid
        print(grid)

    @property
    def A(self):
        return self.action_space

    @property
    def S(self):
        return self.observation_space

## MC

In [None]:
def first_visit_mc_prediction(env, policy, num_episodes, gamma=1.0):
    """
    Performs first-visit Monte Carlo prediction to estimate the value function.

    Args:
        env (gym.Env): The environment representing the MDP.
        policy (function): A function that maps states to actions.
        num_episodes (int): The number of episodes to simulate.
        gamma (float): The discount factor.

    Returns:
        dict: A dictionary mapping states to their estimated values.
    """
    # Initialize the value function
    V = {}

    # Initialize the returns for each state
    returns = {}

    for _ in range(num_episodes):
        # Reset the environment
        state = env.reset()

        # Generate an episode using the given policy
        episode = []
        done = False
        while not done:
            action = policy(state)
            next_state, reward, done, _, _ = env.step(action)
            episode.append((state, reward))
            state = next_state

        # Calculate the return for each state in the episode
        G = 0.0
        t = len(episode) - 1
        for state, reward in reversed(episode):
            G = gamma * G + reward
            if state not in [s for s, _ in episode[:t - 1]]:
                if state not in returns:
                    returns[state] = []
                returns[state].append(G)
            t -= 1

    # Calculate the average return for each state
    for state, total_returns in returns.items():
        V[state] = np.mean(total_returns)

    return V

In [None]:
class MCExploringStarts:
    def __init__(self, env, gamma=0.99, epsilon=0.1, alpha=0.1):
        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon
        self.alpha = alpha
        self.Q = defaultdict(lambda: np.zeros(env.action_space.n))
        self.returns = defaultdict(list)

    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            return self.env.action_space.sample()
        else:
            return np.argmax(self.Q[state])

    def update(self, episode):
        G = 0
        for state, action, reward in reversed(episode):
            G = reward + self.gamma * G
            if (state, action) not in [(s, a) for s, a, _ in episode[:-1]]:
                self.returns[(state, action)].append(G)
                self.Q[state][action] = np.mean(self.returns[(state, action)])

    def train(self, num_episodes):
        for _ in range(num_episodes):
            episode = []
            state = self.env.reset()
            done = False
            while not done:
                action = self.select_action(state)
                next_state, reward, done, _ = self.env.step(action)
                episode.append((state, action, reward))
                state = next_state
            self.update(episode)

# Example usage
env = gym.make('CartPole-v1')
agent = MCExploringStarts(env)
agent.train(num_episodes=10000)

In [None]:
# Monte Carlo Exploring Starts algorithm
class MonteCarloES:
    def __init__(self, env, gamma=1.0, epsilon=0.1):
        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon
        self.q_values = {}
        self.returns = {}

        for i in range(env.size):
            for j in range(env.size):
                for action in env.get_possible_actions():
                    self.q_values[((i, j), action)] = 0.0
                    self.returns[((i, j), action)] = []

    def choose_action(self, state):
        if random.uniform(0, 1) < self.epsilon:
            return random.choice(self.env.get_possible_actions())
        else:
            q_values = [self.q_values[(state, a)] for a in self.env.get_possible_actions()]
            return np.argmax(q_values)

    def generate_episode(self):
        episode = []
        state = (random.randint(0, self.env.size-1), random.randint(0, self.env.size-1))
        self.env.state = state

        while True:
            action = self.choose_action(state)
            next_state, reward, done = self.env.step(action)
            episode.append((state, action, reward))
            if done:
                break
            state = next_state

        return episode

    def update_q_values(self, episode):
        g = 0
        for t in range(len(episode)-1, -1, -1):
            state, action, reward = episode[t]
            g = self.gamma * g + reward
            if (state, action) not in [(x[0], x[1]) for x in episode[:t]]:
                self.returns[(state, action)].append(g)
                self.q_values[(state, action)] = np.mean(self.returns[(state, action)])

    def train(self, episodes=1000):
        for _ in range(episodes):
            episode = self.generate_episode()
            self.update_q_values(episode)

# Initialize environment and agent
gridworld = Gridworld(size=5, start=(0, 0), goal=(4, 4))
mc_agent = MonteCarloES(gridworld)

# Train the agent
mc_agent.train(episodes=10000)

# Print Q-values
for state_action, value in mc_agent.q_values.items():
    print(f"State-Action: {state_action}, Value: {value:.2f}")

In [None]:
import numpy as np

# Initialize Q-table
Q = np.zeros((num_states, num_actions))

# Define exploration policy
epsilon = 0.1

# Implement Monte Carlo Exploring Starts
for episode in range(num_episodes):
    state = env.reset()
    done = False
    while not done:
        # Choose action based on epsilon-greedy policy
        if np.random.rand() < epsilon:
            action = np.random.choice(num_actions)
        else:
            action = np.argmax(Q[state])

        next_state, reward, done, _ = env.step(action)

        # Update Q-value based on the reward received
        Q[state][action] += alpha * (reward + gamma * np.max(Q[next_state]) - Q[state][action])

        state = next_state

def mc_exploring_starts(env, num_episodes, gamma=1.0):
    """
    Performs first-visit Monte Carlo prediction to estimate the value function.

    Args:
        env (gym.Env): The environment representing the MDP.
        policy (function): A function that maps states to actions.
        num_episodes (int): The number of episodes to simulate.
        gamma (float): The discount factor.

    Returns:
        dict: A dictionary mapping states to their estimated values.
    """

    policy = np.random.rand(3,2)
    Q = np.zeros((num_states, num_actions))
    returns = np.zeros((num_states, num_actions))

    # Initialize the returns for each state
    returns = {}

    for _ in range(num_episodes):
        # Reset the environment
        state = env.reset()

        # Generate an episode using the given policy
        episode = []
        done = False
        while not done:
            action = policy(state)
            next_state, reward, done, _, _ = env.step(action)
            episode.append((state, reward))
            state = next_state

        # Calculate the return for each state in the episode
        G = 0.0
        t = len(episode) - 1
        for state, reward in reversed(episode):
            G = gamma * G + reward
            if state not in [s for s, _ in episode[:t - 1]]:
                if state not in returns:
                    returns[state] = []
                returns[state].append(G)
            t -= 1

    # Calculate the average return for each state
    for state, total_returns in returns.items():
        V[state] = np.mean(total_returns)

    return V

## RUN

In [None]:
# Create the grid world environment
env = GridWorldEnv(start_pos=(3, 0), goal_pos=[(0,0), (3,3)], obstacles=[])

print(env.A.dtype)
print(env.S)

'''
state = env.reset()

# Render the environment
env.render()

# Define a random policy
def policy(state):
    return env.action_space.sample()

# Run the first-visit Monte Carlo prediction
value_function = first_visit_mc_prediction(env, policy, num_episodes=50000)

print(value_function)


# Create a grid to represent the environment
grid = np.zeros((4, 4))

for state, total_returns in value_function.items():
    row = state // 4
    col = state % 4
    grid[row, col] = total_returns

# Print the grid
print(grid)

'''

int64
Box(0, 3, (2,), int64)


'\nstate = env.reset()\n\n# Render the environment\nenv.render()\n\n# Define a random policy\ndef policy(state):\n    return env.action_space.sample()\n\n# Run the first-visit Monte Carlo prediction\nvalue_function = first_visit_mc_prediction(env, policy, num_episodes=50000)\n\nprint(value_function)\n\n\n# Create a grid to represent the environment\ngrid = np.zeros((4, 4))\n\nfor state, total_returns in value_function.items():\n    row = state // 4\n    col = state % 4\n    grid[row, col] = total_returns\n\n# Print the grid\nprint(grid)\n\n'

In [None]:
import gymnasium as gym
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=True)


env.reset()
print(env.step(0))

# Define a random policy
def policy(state):
    return env.action_space.sample()

# Run the first-visit Monte Carlo prediction
value_function = first_visit_mc_prediction(env, policy, num_episodes=10000)

# Print the estimated value function
for state, value in value_function.items():
    print(f"State: {state}, Value: {value:.2f}")


(4, 0.0, False, False, {'prob': 0.3333333333333333})
State: 4, Value: 0.02
State: 0, Value: 0.01
State: 1, Value: 0.01
State: 10, Value: 0.14
State: 6, Value: 0.04
State: 3, Value: 0.01
State: 2, Value: 0.02
State: 9, Value: 0.08
State: 8, Value: 0.04
State: 13, Value: 0.15
State: 14, Value: 0.42
