In [12]:
import numpy as np
from operator import itemgetter

class MDP:
    
    """
    Modèle de processus de décision markovien pour un environnement avec des états, des récompenses et des transitions.

    Attributes:
        n_rows (int): Nombre de lignes dans la grille.
        n_cols (int): Nombre de colonnes dans la grille.
        rewards (np.array): Matrice des récompenses pour chaque état.
        terminal_states (list): Liste des états terminaux.
        wall (tuple): Position de l'obstacle sur la grille.
    """
    
    def __init__(self, n_rows=3, n_cols=4):
        """Initialisation du MDP avec la grille spécifiée, les états terminaux et les récompenses."""

        self.n_rows = n_rows
        self.n_cols = n_cols
        
        # Initialisation des récompenses avec des zéros puis définition des récompenses pour les états terminaux.
        self.rewards = np.zeros((n_rows, n_cols))
        self.rewards[0][3] = +1
        self.rewards[1][3] = -1
        self.terminal_states = [(0, 3), (1, 3)]
        self.wall = (1, 1)
        
    def valid_transition(self, state):
        x, y = state
        return 0 <= x < self.n_rows and 0 <= y < self.n_cols and state != self.wall
    
    def get_states(self):
        states = [(i,j) for i in range(self.n_rows) for j in range(self.n_cols) if (i,j) not in [(0,3), (1,3), (1,1)]]        
        return states
    
    
    def get_actions(self, state):
        """
        return realisables actions from this state
        """
        x, y = state
        
        if x == 0 and y == 1:
            return ['L', 'R']
        
        if x == 1 and y == 0:
            return ['T', 'B']
        
        if x == self.n_rows - 1 and y == 1:
            return ['L', 'R']
        
        if x == 1 and y == 2:
            return ['T', 'B']
        
        if x == 0 and y == 0:
            return ['B', 'R']
    
        if x == 0 and y == self.n_cols - 1:
            return ['B', 'L']

        if x == 0 and 0 < y < self.n_cols:
            return ['B', 'R', 'L']

        if x == self.n_rows - 1 and y == 0:
            return ['T', 'R']

        if x == self.n_rows - 1 and y == self.n_cols - 1:
            return ['T', 'L']

        if x == self.n_rows - 1 and 0 < y < self.n_cols:
            return ['T', 'R', 'L']

        if y == 0 and 0 < x < self.n_rows:
            return ['T', 'B', 'R']

        if y == self.n_cols - 1 and 0 < x < self.n_rows:
            return ['T', 'B', 'L']

        if 0 < x < self.n_rows and 0 < y < self.n_cols:
            return ['T', 'B', 'R', 'L']
        
        
    def get_transitions(self, state, action):
        """
        return new_state, probability
        """
        x, y = state
        dict_action_next_coord = {'T': (x - 1, y), 
                                 'B': (x+1, y),
                                 'L': (x, y-1),
                                 'R': (x, y+1)}
        
        possible_actions = self.get_actions(state)
        good_direction = action

        new_state_and_proba = [(dict_action_next_coord.get(action), 0.8)]
        
        if len(possible_actions) == 1:
            
            return [(dict_action_next_coord.get(action), 1.0)]

        if len(possible_actions) < 4:
            
            rest_of_direction = [d for d in possible_actions if d != good_direction]
            
            
            
            
            if len(rest_of_direction) > 0:
                for act in rest_of_direction:
                    new_state_and_proba.append((dict_action_next_coord.get(act), 0.2/len(rest_of_direction)))
                    
            return new_state_and_proba
        else: 
            if action == 'T' or action == 'B':
                rest_of_direction = ['R', 'L']
            else:
                rest_of_direction = ['B', 'T']
            
            for act in rest_of_direction:
                    new_state_and_proba.append((dict_action_next_coord.get(act), 0.2/len(rest_of_direction)))
                
            return new_state_and_proba
        
    def get_discount_factor(self):
        return 0.9
    
    def get_reward(self, state, action):
        x, y = state
        return self.rewards[x][y]
            
        
class Values:
    
    def __init__(self, n_rows=3, n_cols=4):
        self.V = np.zeros((n_rows, n_cols))
        self.V[0][3] = +1
        self.V[1][3] = -1
        
    def get_value(self, state):
        x, y = state
        return self.V[x][y]


    
class ValueIteration():
    """
        Classe pour effectuer l'algorithme d'itération de valeur sur un MDP.

        Attributes:
            mdp (MDP): L'instance du MDP sur lequel l'algorithme est appliqué.
            values (Values): L'instance des valeurs des états à mettre à jour.
    """
    def __init__(self, mdp, values):
        self.mdp = mdp
        self.values = values
        
        
    def value_iteration(self, max_iterations=100):
        
        for i in range(max_iterations):
            for state in self.mdp.get_states():     
                qtable = []
                for action in self.mdp.get_actions(state):
                   
                    new_value = 0.0
                    reward = self.mdp.get_reward(state, action)
                    for new_state, proba in self.mdp.get_transitions(state, action):
                        print(f"new_state = {new_state}, proba = {proba}")
                        
                        new_value += proba * self.mdp.get_discount_factor() * self.values.get_value(new_state)
                    
                    new_value += reward
                    qtable.append((state, action, new_value))

                max_q = max(qtable, key=itemgetter(2))[2]
                x, y = state
                self.values.V[x][y] = max_q

In [13]:
mdp = MDP()
values = Values()
v_iteration = ValueIteration(mdp, values)
v_iteration.value_iteration()
v_iteration.values.V

new_state = (1, 0), proba = 0.8
new_state = (0, 1), proba = 0.2
new_state = (0, 1), proba = 0.8
new_state = (1, 0), proba = 0.2
new_state = (0, 0), proba = 0.8
new_state = (0, 2), proba = 0.2
new_state = (0, 2), proba = 0.8
new_state = (0, 0), proba = 0.2
new_state = (1, 2), proba = 0.8
new_state = (0, 3), proba = 0.1
new_state = (0, 1), proba = 0.1
new_state = (0, 3), proba = 0.8
new_state = (1, 2), proba = 0.1
new_state = (0, 1), proba = 0.1
new_state = (0, 1), proba = 0.8
new_state = (1, 2), proba = 0.1
new_state = (0, 3), proba = 0.1
new_state = (0, 0), proba = 0.8
new_state = (2, 0), proba = 0.2
new_state = (2, 0), proba = 0.8
new_state = (0, 0), proba = 0.2
new_state = (0, 2), proba = 0.8
new_state = (2, 2), proba = 0.2
new_state = (2, 2), proba = 0.8
new_state = (0, 2), proba = 0.2
new_state = (1, 0), proba = 0.8
new_state = (2, 1), proba = 0.2
new_state = (2, 1), proba = 0.8
new_state = (1, 0), proba = 0.2
new_state = (2, 0), proba = 0.8
new_state = (2, 2), proba = 0.2
new_stat

array([[ 0.61505162,  0.72235085,  0.84950216,  1.        ],
       [ 0.52755004,  0.        ,  0.71656204, -1.        ],
       [ 0.4706271 ,  0.50439481,  0.58289157,  0.23968193]])

In [3]:
class OptimalPolicy:
    
    def __init__(self, mdp, values):
        self.mdp = mdp
        self.values = values
        self.policy = {}  # Dictionary to store optimal action for each state
        
    def compute_optimal_policy(self):
        for state in self.mdp.get_states():
            if state in [(1, 1), (0, 3), (1, 3)]:
                continue  
            
            optimal_action = None
            max_expected_reward = float('-inf')
            
            for action in self.mdp.get_actions(state):
                expected_reward = 0.0
                
                for new_state, proba in self.mdp.get_transitions(state, action):
                    expected_reward += proba * ( self.mdp.get_discount_factor() * self.values.get_value(new_state))
                
                if expected_reward > max_expected_reward:
                    max_expected_reward = expected_reward
                    optimal_action = action
            
            self.policy[state] = optimal_action
    
    def get_optimal_policy(self):
        return self.policy


In [4]:
optimal_policy = OptimalPolicy(mdp, values)
optimal_policy.compute_optimal_policy()
policies = optimal_policy.get_optimal_policy()
policies

{(0, 0): 'R',
 (0, 1): 'R',
 (0, 2): 'R',
 (1, 0): 'T',
 (1, 2): 'T',
 (2, 0): 'T',
 (2, 1): 'R',
 (2, 2): 'T',
 (2, 3): 'L'}

In [5]:
mdp.rewards

array([[ 0.,  0.,  0.,  1.],
       [ 0.,  0.,  0., -1.],
       [ 0.,  0.,  0.,  0.]])

In [6]:
v_iteration.values.V

array([[ 0.61505162,  0.72235085,  0.84950216,  1.        ],
       [ 0.52755004,  0.        ,  0.71656204, -1.        ],
       [ 0.4706271 ,  0.50439481,  0.58289157,  0.23968193]])

In [7]:
direction_map = {'R': '\u2192', 'L': '\u2190', 'T':'\u2191', 'B': '\u2193' }

for i in range(3):
    for j in range(4):
        d = policies.get((i, j), None)
        if d is not None:
            print(direction_map.get(d), end="\t")
        else:
            if (i,j) == mdp.wall: 
                print('\u2584', end="\t")
            elif (i, j) == (0, 3):
                print('\u263A', end="\t")
            elif (i, j) == (1, 3):
                print('\u25D9', end="\t")
            else:
                print("*", end="\t")
    print()
    
    

→	→	→	☺	
↑	▄	↑	◙	
↑	→	↑	←	
