In [2]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import ipywidgets as widgets
from IPython.display import display
from matplotlib import patches

# MDP class

In [3]:
class MDP:
    """A Markov Decision Process, defined by an initial state, transition model,
    and reward function. We also keep track of a gamma value, for use by
    algorithms. The transition model is represented somewhat differently from
    the text. Instead of P(s' | s, a) being a probability number for each
    state/state/action triplet, we instead have T(s, a) return a
    list of (p, s') pairs. We also keep track of the possible states,
    terminal states, and actions for each state. [Page 646]"""

    def __init__(self, start_state, actions_list,
                 terminals, transitions, reward, gamma=0.9):
        if not (0 < gamma <= 1):
            raise ValueError("An MDP must have 0 < gamma <= 1")

        self.start_state = start_state
        self.actions_list = actions_list
        self.terminals = terminals
        self.transitions = transitions
        self.reward = reward
        self.gamma = gamma

    def R(self, state):
        """Return a numeric reward for this state."""
        return self.reward[state]

    def T(self, state, action):
        """Transition model. From a state and an action, return a list
        of (probability, result-state) pairs."""
        return self.transitions[state][action]

    def actions(self, state):
        """Return a list of actions that can be performed in this state. By default, a
        fixed list of actions, except for terminal states. Override this
        method if you need to specialize by state."""

        if state in self.terminals:
            return [None]
        else:
            return self.actions_list

In [4]:
orientations = FORWARD, RIGHT, UP, BACKWARD, LEFT, DOWN = [(1, 0, 0),
                                                           (0, 1, 0),
                                                           (0, 0, 1),
                                                           (-1, 0, 0),
                                                           (0, -1, 0),
                                                           (0, 0, -1)]

In [5]:
# Function to rotate a vector
def rotate_vector(vector, axis, angle_degree):
    if isinstance(vector, tuple):
        vector = np.array(vector)
    angle_rad = np.radians(angle_degree)
    # Rotation matrix around the x-axis
    if axis == 'x':
        rot_matrix = np.array([
            [1, 0, 0],
            [0, np.cos(angle_rad), -np.sin(angle_rad)],
            [0, np.sin(angle_rad), np.cos(angle_rad)]
        ])
    # Rotation matrix around the y-axis
    elif axis == 'y':
        rot_matrix = np.array([
            [np.cos(angle_rad), 0, np.sin(angle_rad)],
            [0, 1, 0],
            [-np.sin(angle_rad), 0, np.cos(angle_rad)]
        ])
    # Rotation matrix around the z-axis
    elif axis == 'z':
        rot_matrix = np.array([
            [np.cos(angle_rad), -np.sin(angle_rad), 0],
            [np.sin(angle_rad), np.cos(angle_rad), 0],
            [0, 0, 1]
        ])
    else:
        raise ValueError('Axis must be either x or y, or z')
    # Apply the rotation
    result = np.dot(rot_matrix, vector)
    return np.round(result).astype(int)

In [6]:
def keep_forward(vector):
    return np.array(vector)


def turn_left(vector):
    # Rotate -90 degrees around the z-axis
    return rotate_vector(vector, 'z', -90)


def turn_right(vector):
    # Rotate 90 degrees around the z-axis
    return rotate_vector(vector, 'z', 90)


def turn_up(vector):
    # Rotate -90 degrees around the y-axis
    return rotate_vector(vector, 'y', -90)


def turn_down(vector):
    # Rotate 90 degrees around the y-axis
    return rotate_vector(vector, 'y', 90)


def turn_backward(vector):
    # Rotate 180 degrees around the y-axis
    return -np.array(vector)

In [7]:
def get_action_distribution(forward_prob, dim=3):
    if not (dim == 2 or dim == 3):
        raise ValueError("Dimension must be 2 or 3")
    distributions = [(forward_prob, keep_forward)]
    turn_actions = [turn_up, turn_down, turn_backward]
    if dim == 3:
        turn_actions += [turn_left, turn_right]

    complement_prob = 1 - forward_prob
    distributions.extend(((complement_prob / len(turn_actions), action) for action in turn_actions))
    return distributions

In [8]:
class GridMDP(MDP):

    def __init__(self, start_state, grid, action_distribution, terminals, gamma=.9):
        # grid.reverse()  # because we want row 0 on bottom, not on top
        reward = dict()
        self.states = set()
        # states = set()
        self.rows = len(grid)
        self.cols = len(grid[0])
        y = 0
        for x in range(self.cols):
            for z in range(self.rows):
                self.states.add((x, y, z))
                reward[(x, y, z)] = grid[z][x]

        actions_list = [FORWARD, UP, DOWN, BACKWARD]
        self.action_distribution = action_distribution
        terminals = [(col, 0, row) for (row, col) in terminals]

        transitions = dict()
        for s in self.states:
            transitions[s] = dict()
            for a in actions_list:
                transitions[s][a] = self.calculate_T(s, a)
        super().__init__(start_state=start_state, actions_list=actions_list,
                         terminals=terminals, transitions=transitions,
                         reward=reward, gamma=gamma)

    def calculate_T(self, state, action):
        return [(prob, self.go(state, transform(action))) for prob, transform in self.action_distribution]

    def T(self, state, action):
        return self.transitions[state][action] if action else [(0.0, state)]

    def go(self, state, direction):
        """Return the state that results from going in this direction."""
        if isinstance(direction, tuple):
            direction = np.array(direction)
        if isinstance(state, tuple):
            state = np.array(state)
        go_state = tuple(state + direction)
        return go_state if go_state in self.states else tuple(state)



In [9]:

rewards = [[-0.04, -0.04, -0.04, -1, -1, -0.04, -0.04, -0.04, -0.04, -0.04, -0.04, -1, -0.04],
           [-0.04, -0.04, -0.04, -1, -1, -0.04, -0.04, -0.04, -0.04, -0.04, -0.04, -1, -0.04],
           [-0.04, -0.04, -0.04, -0.04, -1, -0.04, -0.04, -0.04, -1, -0.04, -0.04, -0.04, -0.04],
           [-0.04, -0.04, -0.04, -0.04, -1, -0.04, -0.04, -0.04, -1, -0.04, -0.04, -0.04, -0.04],
           [-0.04, -1, -0.04, -0.04, -0.04, -0.04, -0.04, -1, -1, -1, -0.04, -0.04, +1],
           [-0.04, -1, -0.04, -0.04, -0.04, -0.04, -0.04, -1, -1, -1, -0.04, -0.04, -0.04]]

obstacles = [(0, 3), (0, 4), (0, 11), (1, 3), (1, 4), (1, 11), (2, 4), (2, 8), (3, 4), (3, 8), (4, 1), (4, 7), (4, 8),
             (4, 9), (5, 1), (5, 7), (5, 8), (5, 9)]
finish = [(4, 12)]
start = [(1, 0)]

In [10]:
D = -1
T = +2
A = -0.04
rewards = [[A, A, A, D, D, A, A, A, A, A, A, D, A],
           [A, A, A, D, D, A, A, A, A, A, A, D, A],
           [A, A, A, A, D, A, A, A, D, A, A, A, A],
           [A, A, A, A, D, A, A, A, D, A, A, A, A],
           [A, D, A, A, A, A, A, D, D, D, A, A, T],
           [A, D, A, A, A, A, A, D, D, D, A, A, A]]

obstacles = [(0, 3), (0, 4), (0, 11), (1, 3), (1, 4), (1, 11), (2, 4), (2, 8), (3, 4), (3, 8), (4, 1), (4, 7), (4, 8),
             (4, 9), (5, 1), (5, 7), (5, 8), (5, 9)]
finish = [(4, 12)]
start = [(1, 0)]

In [11]:
def get_grid_1(D, T, A):
    return [[A, A, A, D, D, A, A, A, A, A, A, D, A],
            [A, A, A, D, D, A, A, A, A, A, A, D, A],
            [A, A, A, A, D, A, A, A, D, A, A, A, A],
            [A, A, A, A, D, A, A, A, D, A, A, A, A],
            [A, D, A, A, A, A, A, D, D, D, A, A, T],
            [A, D, A, A, A, A, A, D, D, D, A, A, A]]

In [12]:
grid_mdp = GridMDP(start_state=start, grid=rewards,
                   action_distribution=get_action_distribution(forward_prob=1, dim=2),
                   terminals=obstacles + finish, gamma=1)

In [13]:
def vis_iteration(grid_mdp, utilities):
    def vis(iteration):
        # Define the colors - each entry in the list corresponds to a point in the colormap range
        colors = ["salmon", "white", "lightblue"]

        # Create the colormap
        cmap = mcolors.LinearSegmentedColormap.from_list("utility", colors)

        fig, ax = plt.subplots(figsize=(8, 8), constrained_layout=True)
        data = utilities[iteration]
        grid = []
        for row in range(grid_mdp.rows):
            current_row = []
            for column in range(grid_mdp.cols):
                current_row.append(data[(column, 0, row)])
            grid.append(current_row)
        im = ax.imshow(grid, cmap=cmap, interpolation='nearest')
        ax.axis('off')

        for col in range(len(grid)):
            for row in range(len(grid[0])):
                value = grid[col][row]
                ax.text(row, col, f"{value:.2f}", va='center', ha='center')
                if (col, row) in obstacles:
                    rect = patches.Rectangle((row - 0.5, col - 0.5), 1, 1, linewidth=1, edgecolor='none',
                                             facecolor='black', alpha=0.5)
                    ax.add_patch(rect)

        fig.patch.set_facecolor('none')
        plt.show()

    return vis

# Value iteration

In [43]:
def value_iteration_instru(mdp, iterations=30):
    U_over_time = []

    U1 = {s: 0 for s in mdp.states}
    R, T, gamma = mdp.R, mdp.T, mdp.gamma
    for _ in range(iterations):
        U = U1.copy()
        for s in mdp.states:
            U1[s] = R(s) + gamma * max([sum([p * U[s1] for (p, s1) in T(s, a)])
                                        for a in mdp.actions(s)])
        U_over_time.append(U)
    return U_over_time


vi_result = value_iteration_instru(grid_mdp)


In [75]:
grid_mdp = GridMDP(start_state=start, grid=get_grid_1(-1.0, 1.0, -0.04),
                   action_distribution=get_action_distribution(forward_prob=0.8, dim=2),
                   terminals=obstacles + finish, gamma=0.95)
vi_result = value_iteration_instru(grid_mdp)
f = vis_iteration(grid_mdp, vi_result)
iteration_slider = widgets.IntSlider(min=1, max=len(vi_result) - 1, step=1, value=len(vi_result) - 1)
w = widgets.interactive(f, iteration=iteration_slider)
iteration_slider2 = widgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=1.0)
w2 = widgets.interactive(lambda iter: print(iter), iter=iteration_slider2)
display(w2)

display(w)

NameError: name 'vis_iteration' is not defined

In [23]:
# Define a function that takes two parameters, affected by two sliders.
def update_function(param1, param2):
    print(f"Parameter 1: {param1}, Parameter 2: {param2}")


# Create two sliders
iterations = widgets.IntSlider(min=0, max=10, step=1, value=5, description='Param1')
slider2 = widgets.IntSlider(min=10, max=20, step=1, value=15, description='Param2')

# Use interactive to link the function and the sliders.
interactive_plot = widgets.interactive(update_function, param1=iterations, param2=slider2)

# Display the interactive widget.
display(interactive_plot)

interactive(children=(IntSlider(value=5, description='Param1', max=10), IntSlider(value=15, description='Param…

In [93]:
def Q_value(mdp, s, a, U):
    gamma, T, R = mdp.gamma, mdp.T, mdp.R
    return sum([p * (U[s_next] * gamma + R(s_next)) for (p, s_next) in T(s, a)])


def value_iteration(mdp, epsilon):
    utilities_history = []
    policies_history = []

    U_current = {s: 0 for s in mdp.states}

    actions, gamma = mdp.actions, mdp.gamma
    while True:
        delta = 0.0
        U_previous = U_current.copy()
        for s in mdp.states:
            U_current[s] = max([Q_value(mdp, s, a, U_previous) for a in actions(s)])
            delta = max(delta, abs(U_current[s] - U_previous[s]))
        utilities_history.append(U_current)
        policies_history.append(None)
        print(delta, epsilon * (1 - gamma) / gamma)
        if delta <= epsilon * (1 - gamma) / gamma:
            return utilities_history, policies_history


grid_mdp = GridMDP(start_state=start, grid=get_grid_1(-1.0, 1.0, -0.04),
                   action_distribution=get_action_distribution(forward_prob=0.8, dim=2),
                   terminals=obstacles + finish, gamma=0.95)
value_iteration(grid_mdp, epsilon=1e-3)

0.7919999999999999 5.263157894736847e-05
0.6429599999999999 5.263157894736847e-05
0.5234339555555553 5.263157894736847e-05
0.42325618814814797 5.263157894736847e-05
0.3398125071061727 5.263157894736847e-05
0.2258684594521777 5.263157894736847e-05
0.16955550667361663 5.263157894736847e-05
0.1496665288776064 5.263157894736847e-05
0.13094764500839967 5.263157894736847e-05
0.11456799634628215 5.263157894736847e-05
0.09955459698403898 5.263157894736847e-05
0.08593562320374931 5.263157894736847e-05
0.07091012866726251 5.263157894736847e-05
0.06138388517101728 5.263157894736847e-05
0.05621605146478545 5.263157894736847e-05
0.048742566644185026 5.263157894736847e-05
0.04385955374336187 5.263157894736847e-05
0.03709514891285076 5.263157894736847e-05
0.03129040643695452 5.263157894736847e-05
0.026732024800800114 5.263157894736847e-05
0.024094207713017846 5.263157894736847e-05
0.021373534550907458 5.263157894736847e-05
0.017781918259232254 5.263157894736847e-05
0.013969598708720332 5.263157894736

([{(6, 0, 0): 0.015466188324319735,
   (4, 0, 1): 0.0,
   (8, 0, 2): 0.0,
   (10, 0, 5): 0.6249293109374141,
   (1, 0, 1): -0.5629282985195389,
   (3, 0, 3): -0.47667817487712316,
   (11, 0, 5): 0.8490536674726135,
   (2, 0, 1): -0.5762979972302581,
   (6, 0, 2): -0.07122451900355989,
   (4, 0, 3): 0.0,
   (8, 0, 4): 0.0,
   (12, 0, 0): 0.38149229601122375,
   (0, 0, 0): -0.5892233998624377,
   (2, 0, 3): -0.49529263833053305,
   (6, 0, 4): -0.24955028589141592,
   (4, 0, 5): -0.3509720261473402,
   (5, 0, 2): -0.18143233361441968,
   (7, 0, 4): 0.0,
   (10, 0, 0): 0.19832305545752166,
   (12, 0, 2): 0.8139889425751028,
   (0, 0, 2): -0.5612332842495587,
   (11, 0, 0): 0.0,
   (2, 0, 5): -0.4555709381638599,
   (5, 0, 4): -0.281319585868894,
   (0, 0, 4): -0.617440385523264,
   (11, 0, 2): 0.6145105552896546,
   (4, 0, 0): 0.0,
   (8, 0, 1): 0.12928358135688692,
   (9, 0, 1): 0.2899064867445127,
   (11, 0, 4): 0.9423926294363254,
   (1, 0, 3): -0.5509163771916438,
   (2, 0, 0): -0.6179

In [120]:
def Q_value(mdp, s, a, U):
    gamma, T, R = mdp.gamma, mdp.T, mdp.R
    return sum([p * (U[s_next] * gamma + R(s_next)) for (p, s_next) in T(s, a)])


def value_iteration(mdp, epsilon=1e-3):
    utilities_history = []
    policies_history = []

    U_current = mdp.reward.copy()

    actions, gamma = mdp.actions, mdp.gamma
    while True:
        delta = 0.0
        U_previous = U_current.copy()
        for s in (mdp.states - mdp.terminals):
            U_current[s] = max((Q_value(mdp, s, a, U_previous) for a in actions(s)))
            delta = max(delta, abs(U_current[s] - U_previous[s]))
        utilities_history.append(U_current.copy())
        policies_history.append(None)
        if delta <= epsilon * (1 - gamma) / gamma:
            return utilities_history


grid_mdp = GridMDP(start_state=start, grid=get_grid_1(-1.0, 1.0, -0.04),
                   action_distribution=get_action_distribution(forward_prob=0.8, dim=2),
                   terminals=obstacles + finish, gamma=0.95)
value_iteration(grid_mdp, epsilon=1e-3)

TypeError: unsupported operand type(s) for -: 'set' and 'list'

In [133]:
def value_iteration(mdp, epsilon=1e-3):
    utilities_history = []
    policies_history = []

    U_current = mdp.reward.copy()
    utilities_history.append(U_current.copy())

    actions, R, T, gamma = mdp.actions, mdp.R, mdp.T, mdp.gamma
    while True:
        delta = 0.0
        U_previous = U_current.copy()
        for s in mdp.states:
            U_current[s] = R(s) + gamma * max(sum(p * U_previous[s1] for (p, s1) in T(s, a))
                                              for a in mdp.actions(s))
            delta = max(delta, abs(U_current[s] - U_previous[s]))
        utilities_history.append(U_current.copy())
        policies_history.append(None)
        if delta <= epsilon * (1 - gamma) / gamma:
            return utilities_history

In [134]:
class Visualizer2D:
    def __init__(self, start_state, obstacles, finish_state, iteration_algorithm, vmin=-10.0, vmax=10.0):
        self.start_state = start_state
        self.obstacles = obstacles
        self.finish_state = finish_state
        self.terminals = obstacles + finish_state
        self.iteration_algorithm = iteration_algorithm

        self.vmin = vmin
        self.vmax = vmax
        self.kwargs = None
        self.utilities = None
        self.policies = None

        self.iteration_slider = widgets.IntSlider(min=1, max=None, step=1, value=None,
                                                  description='Iteration')

    def update(self, iteration, **kwargs):
        if self.kwargs != kwargs:
            grid = get_grid_1(D=kwargs['D'], T=kwargs['T'], A=kwargs['A'])
            distribution = get_action_distribution(forward_prob=kwargs['forward_prob'])
            mdp = GridMDP(start_state=self.start_state, grid=grid, action_distribution=distribution,
                          terminals=self.terminals, gamma=kwargs['gamma'])
            self.utilities = self.iteration_algorithm(mdp)
            self.iteration_slider.max = len(self.utilities)
            self.iteration_slider.value = len(self.utilities)
            self.kwargs = kwargs
        self.visualize(iteration)

    def visualize(self, iteration):
        # Define the colors - each entry in the list corresponds to a point in the colormap range
        colors = ["salmon", "white", "lightblue"]

        # Create the colormap
        cmap = mcolors.LinearSegmentedColormap.from_list("utility", colors)

        fig, ax = plt.subplots(figsize=(8, 8), constrained_layout=True)
        data = self.utilities[iteration - 1]
        grid = []
        for row in range(grid_mdp.rows):
            current_row = []
            for column in range(grid_mdp.cols):
                current_row.append(data[(column, 0, row)])
            grid.append(current_row)
        ax.imshow(grid, vmin=self.vmin, vmax=self.vmax, cmap=cmap, interpolation='nearest')
        ax.axis('off')

        for col in range(len(grid)):
            for row in range(len(grid[0])):
                value = grid[col][row]
                ax.text(row, col, f"{value:.2f}", va='center', ha='center')
                if (col, row) in obstacles:
                    rect = patches.Rectangle((row - 0.5, col - 0.5), 1, 1, linewidth=1, edgecolor='none',
                                             facecolor='black', alpha=0.5)
                    ax.add_patch(rect)

        fig.patch.set_facecolor('none')
        plt.show()


In [135]:

VMIN = -1.0
VMAX = 1.0

visualizer = Visualizer2D(start_state=start, obstacles=obstacles, finish_state=finish,
                          iteration_algorithm=value_iteration,
                          vmin=VMIN, vmax=VMAX,
                          )
# 
# iteration_slider = widgets.IntSlider(min=1, max=visualizer.iterations, step=1, value=visualizer.iterations,
#                                      description='Iteration')
D_slider = widgets.FloatSlider(min=VMIN, max=0.0, step=0.01, value=-1.0, description='Obstacles')
T_slider = widgets.FloatSlider(min=0.0, max=VMAX, step=0.01, value=1.0, description='Finish')
A_slider = widgets.FloatSlider(min=VMIN, max=VMAX, step=0.01, value=-0.04, description='Air')
forward_prob_slider = widgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.8,
                                          description='Forward prob')
gamma_slider = widgets.FloatSlider(min=0.01, max=1.0, step=0.01, value=0.9, description='Gamma')

interactive_plot = widgets.interactive(visualizer.update,
                                       iteration=visualizer.iteration_slider,
                                       D=D_slider,
                                       T=T_slider,
                                       A=A_slider,
                                       forward_prob=forward_prob_slider,
                                       gamma=gamma_slider
                                       )
display(interactive_plot)

interactive(children=(IntSlider(value=39, description='Iteration', max=39, min=1), FloatSlider(value=-1.0, des…

In [78]:
grid_mdp = GridMDP(start_state=start, grid=get_grid_1(-1.0, 1.0, -0.04),
                   action_distribution=get_action_distribution(forward_prob=0.8, dim=2),
                   terminals=obstacles + finish, gamma=0.95)
value_iteration(grid_mdp, epsilon=1e-3)

KeyboardInterrupt: 

In [32]:
get_action_distribution(forward_prob=0.4, dim=2)

[(0.4, <function __main__.keep_forward(vector)>),
 (0.19999999999999998, <function __main__.turn_up(vector)>),
 (0.19999999999999998, <function __main__.turn_down(vector)>),
 (0.19999999999999998, <function __main__.turn_backward(vector)>)]

In [26]:
# Create a text input widget
text_input = widgets.Text(
    value='',
    description='Enter value:',
    continuous_update=False  # This can be set to True if you want real-time updates
)

output = widgets.Output()


# Function to handle input
def handle_input(change):
    # Perform an action with the new input value
    # 'change' is a dictionary containing details about the state change,
    # including the new value under the key 'new'.
    with output:
        output.clear_output()
        print("Value entered:", change.new)


# Attach the function to the 'value' trait of the input box.
# This will fire the function whenever the user presses enter or shifts focus from the input box.
text_input.observe(handle_input, names='value')

# Display the text input box
display(text_input, output)

Text(value='', continuous_update=False, description='Enter value:')

Output()

In [33]:
value_iteration_instru(grid_mdp)

[{(6, 0, 0): 0,
  (4, 0, 1): 0,
  (8, 0, 2): 0,
  (10, 0, 5): 0,
  (1, 0, 1): 0,
  (3, 0, 3): 0,
  (11, 0, 5): 0,
  (2, 0, 1): 0,
  (6, 0, 2): 0,
  (4, 0, 3): 0,
  (8, 0, 4): 0,
  (12, 0, 0): 0,
  (0, 0, 0): 0,
  (2, 0, 3): 0,
  (6, 0, 4): 0,
  (4, 0, 5): 0,
  (5, 0, 2): 0,
  (7, 0, 4): 0,
  (10, 0, 0): 0,
  (12, 0, 2): 0,
  (0, 0, 2): 0,
  (11, 0, 0): 0,
  (2, 0, 5): 0,
  (5, 0, 4): 0,
  (0, 0, 4): 0,
  (11, 0, 2): 0,
  (4, 0, 0): 0,
  (8, 0, 1): 0,
  (9, 0, 1): 0,
  (11, 0, 4): 0,
  (1, 0, 3): 0,
  (2, 0, 0): 0,
  (3, 0, 5): 0,
  (6, 0, 1): 0,
  (4, 0, 2): 0,
  (8, 0, 3): 0,
  (7, 0, 1): 0,
  (9, 0, 3): 0,
  (1, 0, 5): 0,
  (2, 0, 2): 0,
  (4, 0, 4): 0,
  (5, 0, 1): 0,
  (7, 0, 3): 0,
  (9, 0, 5): 0,
  (0, 0, 1): 0,
  (10, 0, 2): 0,
  (12, 0, 4): 0,
  (3, 0, 0): 0,
  (5, 0, 3): 0,
  (7, 0, 5): 0,
  (0, 0, 3): 0,
  (10, 0, 4): 0,
  (11, 0, 1): 0,
  (1, 0, 0): 0,
  (3, 0, 2): 0,
  (5, 0, 5): 0,
  (9, 0, 0): 0,
  (0, 0, 5): 0,
  (1, 0, 2): 0,
  (3, 0, 4): 0,
  (6, 0, 3): 0,
  (7, 0, 0):

# Policy iteration

# 2D

# 3D