In [11]:
import numpy as np

NORTH = 0
SOUTH = 1
WEST = 2
EAST = 3

class Robot:
    """
    The world is a MxN grid and your goal is to reach the terminal state.
    
    For, example, a 4x4 grid could look like this:
    
    T . . .
    . . . .
    . o o .
    . x o T
    
    x is your current state, T is the terminal state and o is an obstacle
    
    You can take four different actions: moving north, south, west or east.
    Actions cannot take you off the grid or move through obstacles.
    
    When taking an action, you will receive a reward depending which state you arrived 
    (default reward is 0 for terminal state and -1 for other states)
    """
    
    def __init__(self, world_definition = None):
        if world_definition is None:
            self.rewards = {"T": 0.0, ".": -1.0}
            self.world = np.array (
                [["T", ".", ".", "."],
                 [".", ".", ".", "."],
                 [".", ".", ".", "."],
                 [".", ".", ".", "T"]])
        else:
            self.world, self.rewards = world_definition
        
        self.shape = self.world.shape
        self.MAX_Y = self.shape[0]
        self.MAX_X = self.shape[1]
    
        # State space
        self.nSp = np.prod(self.shape)
        self.S = [] # Set of all non-terminal states
        self.Sp = [] # Set of all states
        self.S_start = []
        
        # Action set
        self.A = [NORTH, SOUTH, WEST, EAST]
        self.nA = len(self.A)
        
        self.T = {} # Transition model: T(.,.): S x A --> S
        self.R = {} # Reward function: R(.,.): S x A --> R
    
        for state in range(self.nSp):
            self.T[state] = {a: [] for a in self.A} # T[state][action] = T(s,a)
            self.R[state] = {a: [] for a in self.A} # R[state][action] = R(s,a)
        
            if self.is_terminal(state):
                self.Sp.append(state)
                
                for action in self.A:
                    new_state_a = state # If state is terminal, we do not move from current state
                
                    # Get the reward of the updates state from the world
                    x, y = self._state2coord(new_state_a)
                    reward = self.rewards[self.world[y, x]] # The world is accessed by (row, column)
                
                    self.T[state][action] = new_state_a
                    self.R[state][action] = reward
            
            else:
                self.S.append(state)
                
                if self._is_start(state):
                    self.S_start.append(state)
                
                for action in self.A:
                    new_state_a = self._update_state(state, action) # state + action: agent moves to a new state
                
                    # Get the reward of the updates state from the world
                    x, y = self._state2coord(new_state_a)
                    reward = self.rewards[self.world[y, x]] # The world is accessed by (row, column)
                
                    self.T[state][action] = new_state_a
                    self.R[state][action] = reward
                
                
                
        self.Sp = self.Sp + self.S
        
    def take_action(self, state, action):
        """
        
        """
        new_state_a = self.T[state][action]
        reward = self.R[state][action]
        
        return new_state_a, reward
    
    def reset(self, random_initial_state = False):
        """
        
        """
        if random_initial_state:
            self._current_state = np.random.choice(self.S, size=1)[0]
        else:
            self._current_state = self.S_start[0]
            
        return self._current_state
    
    def _update_state(self, state, action):
        """
        The origin of the gridworld is in the south-west corner
        parameters: 
            state: index of state s
            action: index of action a
        
        return:
            index of the new state s'
        """
        
        x, y = self._state2coord(state)
        
        if action == NORTH:
            new_xy = (x,y-1)
        elif action == SOUTH:
            new_xy = (x,y+1)
        elif action == WEST:
            new_xy = (x-1,y)
        elif action == EAST:
            new_xy = (x+1,y)
        else:
            raise ValueError ("Invalid action %d." % action)
            
        if self._is_outside_at(new_xy) or self._is_obstacle_at(new_xy):
            # If the updated step would be outside the gridworld or on top of an obstacle, the state remains unchainged
            return self._coord2state ((x, y)) 
        else:
            return self._coord2state(new_xy)
        
    def _is_start(self, state):
        """
        
        """
        x, y = self._state2coord (state)
        return self.world[y, x] == "S"

    def is_terminal(self, state):
        """
        Determine if current state is a terminal state
        parameters: 
            state: index of state s
        return:
            boolean: true if state is terminal, false otherwise
        """
        
        x, y = self._state2coord(state)
        return self.world[y][x] == "T"
    
    def _is_outside_at(self, xy):
        """
        Determine if current state is a outside gridworld boundaries
        parameters: 
            state: index of state s
        return:
            boolean: true if state is outside the gridworld, false otherwise
        """
        x = xy[0]
        y = xy[1]
        
        return y < 0 or x < 0 or x >= self.MAX_X or y >= self.MAX_Y
    
    def _is_obstacle_at(self, xy):
        """
        Determine if there is an obstacle at the current state
        parameters: 
            state: index of state s
        return:
            boolean: true if current coordinates have an obstacle, false otherwise
        """
        y, x = xy[1], xy[0]
        
        return self.world[y][x] == "o"
        
    def _state2coord(self, state):
        """
        Converts a flat index (state) into a tuple of coordinate arrays.
        parameters: 
            state: index of state s
        return:
            tuple: state coordinates (x,y)
        """
        
        yx = np.unravel_index(state, self.shape, order="C")
        return (yx[1], yx[0])
    
    def _coord2state(self, xy):
        """
        Converts a tuple of index arrays into an array of flat indices, applying boundary modes to the multi-index.
        parameters: 
            tuple: state coordinates (x,y)
        return:
            state: index of state s
        """
        
        return np.ravel_multi_index((xy[1], xy[0]), self.shape, order="C")
    

In [12]:
def calculate_value_for_action(env, s, V, discount_factor):
    """
    Function to calculate the value of all actions in a given state
    parameters: 
        env: instance of the Robot class
        s: index of current state s
        V: vector (shape (nSp,)) with the current value function estimate
        discoun_factor: discount factor gamma
    return:
        vector (shape (nA,)) containing the value for each action
    """
    
    # Vector to store all the values
    A = np.zeros(env.nA)
    
    # Calculate the value for each action a in A
    for action in range(0, env.nA):
        # Get the reward from the reward function R(s,a) of the environment
        reward = env.R[s][action]
        # Get the new state from the transition model T(s,a)
        new_state_a = env.T[s][action]
        # Calculate R(s,a) + \gamma * v(T(s,a))
        A[action] = reward + discount_factor * V[new_state_a]
    
    return A

def value_iteration(env, theta=0.0001, discount_factor = 0.9):
    """
    Value-iteration algorithm
    parameters: 
        env: instance of the Robot class
        theta: convergence threshold
        discoun_factor: discount factor gamma
    return:
        vector (shape (nSp,)) containing the value-function estimate
    """
    
    # Initialize value-function V(s) for each state to be zero
    V = np.zeros(env.nSp)
    k = 0
    
    while True:
        delta = 0
        
        for s in env.S:
            v = V[s]
            
            # Calculate R(s,a) + \gamma * v(T(s,a)) for each action a in A
            A = calculate_value_for_action(env, s, V, discount_factor)
            
            # Find the best action from A and assign its value to V(s)
            V[s] = np.amax(A)
            
            # Update the delta
            delta = max(delta, abs(V[s] - v))
        
        k = k + 1
        
        # Check if we should stop
        if delta < theta:
            break
    
    print ("Iterations (value iteration): %d" % k)
    
    return V

def extract_policy(env, V, discount_factor = 0.9):
    """
    Find the optimal policy using a greedy strategy.

    If two actions have the same value for a certain state s,
    the action is chosen in the order of their indices.

    parameters: 
        env: instance of the Robot class
        V: vector (shape (nSp,)) with the current value function estimate
        discoun_factor: discount factor gamma
    return:
        policy: (shape (nSp,)) has the optimal action a in state s
        Q: shape (nSp, nA), action-value-function containing the value of action a in state s
    """
    # pi(.): S --> A
    policy = np.zeros(env.nSp)
    
    # Q is the action-value-function
    Q = np.zeros([env.nSp, env.nA])
    
    for s in env.S:
        # Calculate R(s,a) + \gamma * v(T(s,a)) for each action a in A
        A = calculate_value_for_action(env, s, V, discount_factor)
        # Get the index of the best action
        best_action = np.argmax(A)
        # The best action is always chosen for each state s
        policy[s] = best_action
        
        Q[s,:] = A
    
    return policy, Q

In [13]:
def create_example_world(type = "4x4_world"):
    if type == "4x4_world":
        rewards = {"T": 0.0, ".": -1.0, "o": -1.0, "S": -1.0}
        world = np.array (
            [["T", ".", ".", "."],
             ["o", ".", ".", "o"],
             ["S", ".", ".", "."],
             [".", ".", ".", "T"]])

    elif type == "obstacles":
        rewards = {"T": 0.0, ".": -1.0, "o": -1.0, "S": -1.0}
        world = np.array (
            [[".", ".", ".", ".", ".", "."],
             [".", "o", ".", ".", "S", "."],
             [".", "o", "o", ".", ".", "."],
             [".", ".", "o", ".", ".", "."],
             [".", ".", "o", "o", "o", "o"],
             [".", ".", ".", ".", ".", "T"]])

    elif type == "6x6_world_blocked":
        rewards = {"T": 0.0, ".": -1.0, "o": -1.0, "S": -1.0}
        world = np.array (
            [["T", ".", ".", ".", ".", "."],
             [".", ".", ".", ".", ".", "S"],
             [".", ".", "o", "o", "o", "o"],
             [".", ".", "o", ".", ".", "."],
             [".", ".", "o", ".", ".", "."],
             [".", ".", "o", ".", ".", "."]])

    elif type == "6x4_world_2":
        rewards = {"T": 0.0, ".": -1.0, "o": -1.0, "x": -5.0, "S": -1.0}
        world = np.array (
            [["T", ".", ".", "."],
             ["x", "x", "x", "x"],
             ["x", "x", ".", "x"],
             ["x", "x", ".", "x"],
             ["x", "x", ".", "x"],
             ["S", ".", ".", "."]])
    else:
        raise ValueError ("Invalid example world type: %s" % type)

    return world, rewards

discount_factor = 0.9

for type in ["6x6_world_blocked"]:
    gworld = create_example_world(type)
    env = Robot(gworld)

    V = value_iteration(env, discount_factor = discount_factor)
    policy, Q = extract_policy(env, V, discount_factor = discount_factor)

    print("World: ")
    print(np.array(env.world).reshape(env.shape))
    print("Optimal policy: ")
    print(policy.reshape(env.shape))
    print("Actions: NORTH=0, SOUTH=1, WEST=2, EAST=3")


Iterations (value iteration): 89
World: 
[['T' '.' '.' '.' '.' '.']
 ['.' '.' '.' '.' '.' 'S']
 ['.' '.' 'o' 'o' 'o' 'o']
 ['.' '.' 'o' '.' '.' '.']
 ['.' '.' 'o' '.' '.' '.']
 ['.' '.' 'o' '.' '.' '.']]
Optimal policy: 
[[0. 2. 2. 2. 2. 2.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 2. 0. 0. 0.]
 [0. 0. 2. 0. 0. 0.]
 [0. 0. 2. 0. 0. 0.]]
Actions: NORTH=0, SOUTH=1, WEST=2, EAST=3


In [14]:
def q_learning(robot, init_state, discount_factor=1.0, num_iterations=100000):
    """
    
    """
    
    Q = np.zeros((robot.nSp, robot.nA))
    
    i = 0
    
    cur_state = init_state
    while i <= num_iterations:
        i += 1
        
        # Choose the next action according to the policy (random policy)
        action = np.random.choice(robot.A, 1)[0]
        
        # Get the state and reward by taking action from the current state
        new_state, reward = robot.take_action(cur_state, action)
        
        # Update current Q value for current state and action
        Q[cur_state][action] = reward + discount_factor * np.max(Q[new_state])
        
        
        
        if robot.is_terminal(new_state):
            cur_state = robot.reset()
        else:
            cur_state = new_state
        
    return Q

def get_optimal_path(robot, Q):
    
    path = []
    init_state = robot.S_start[0]
    
    cur_state = init_state
    path.append(cur_state)
    
    while not robot.is_terminal(cur_state):
        action = np.argmax(Q[cur_state]) # get best action
        cur_state, r = robot.take_action(cur_state, action)
        path.append(cur_state)
        
    return path
    
    

In [119]:
import svgwrite

def best_directions(x,y, Q, robot):
    s = robot._coord2state((x,y))
    actions = np.argwhere(Q[s] == np.amax(Q[s]))
    return actions.flatten().tolist()


def draw_policy(robot, Q):
    square_size = 90
    width = 3
    half_size = np.ceil(square_size/2)
    y,x = robot.shape
    cellcolor = "white"
    white = "white"
    T_color = "grey"
    o_color = "black"
    S_color = "green"
    x_color = "red"

    dwg = svgwrite.Drawing('policy.svg', profile='full')
    for i in range(0,x):
        for w in range(0,y):
            if robot.world[w,i] == "S":
                cellcolor = S_color
            elif robot.world[w,i] == "T":
                cellcolor = T_color
            elif robot.world[w,i] == "o":
                cellcolor = o_color
            elif robot.world[w,i] == "x":
                cellcolor = x_color
            else:
                cellcolor = white
            dwg.add(dwg.rect((i*square_size, w*square_size), (square_size, square_size), fill=cellcolor, stroke='black'))

            if robot.world[w, i] in ["T", "o"]:
                    continue

            center_x = i * square_size + half_size
            center_y = w * square_size + half_size

            a = best_directions(i, w, Q, robot)
            for q in range(0, robot.nA):
                if NORTH in a:
                    dwg.add(dwg.polyline([(center_x, center_y), (center_x, center_y - half_size)], stroke = "black", stroke_width = width, fill = "none"))
                if SOUTH in a:
                    dwg.add(dwg.polyline([(center_x, center_y), (center_x, center_y + half_size)], stroke = "black", stroke_width = width, fill = "none"))
                if WEST in a:
                    dwg.add(dwg.polyline([(center_x, center_y), (center_x - half_size, center_y )], stroke = "black", stroke_width = width, fill = "none"))
                if EAST in a:
                    dwg.add(dwg.polyline([(center_x, center_y), (center_x + half_size, center_y)], stroke = "black", stroke_width = width, fill = "none"))

    dwg.add(dwg.text('Test', insert=(0, 0.2), fill='red'))
    dwg.save()

In [121]:
discount_factor = 0.9
robot = Robot(create_example_world("6x6_world_blocked")) # 4x4_world, 6x4_world_2 6x6_world_blocked
Q = q_learning(robot, robot.S_start[0], discount_factor)
print ("Action-value function: ")
print(Q)
path = get_optimal_path(robot, Q)
print ("Optimal path: ")
print(list(map(robot._state2coord, path)))
draw_policy(robot,Q)

Action-value function: 
[[ 0.       0.       0.       0.     ]
 [-1.      -1.9      0.      -1.9    ]
 [-1.9     -2.71    -1.      -2.71   ]
 [-2.71    -3.439   -1.9     -3.439  ]
 [-3.439   -4.0951  -2.71    -4.0951 ]
 [-4.0951  -4.68559 -3.439   -4.0951 ]
 [ 0.      -1.9     -1.      -1.9    ]
 [-1.      -2.71    -1.      -2.71   ]
 [-1.9     -2.71    -1.9     -3.439  ]
 [-2.71    -3.439   -2.71    -4.0951 ]
 [-3.439   -4.0951  -3.439   -4.68559]
 [-4.0951  -4.68559 -4.0951  -4.68559]
 [-1.      -2.71    -1.9     -2.71   ]
 [-1.9     -3.439   -1.9     -2.71   ]
 [ 0.       0.       0.       0.     ]
 [ 0.       0.       0.       0.     ]
 [ 0.       0.       0.       0.     ]
 [ 0.       0.       0.       0.     ]
 [-1.9     -3.439   -2.71    -3.439  ]
 [-2.71    -4.0951  -2.71    -3.439  ]
 [ 0.       0.       0.       0.     ]
 [ 0.       0.       0.       0.     ]
 [ 0.       0.       0.       0.     ]
 [ 0.       0.       0.       0.     ]
 [-2.71    -4.0951  -3.439   -4.0951 ]
 