## Exercise 2: Reinforcement Learning
Importing the libraries

In [10]:
import random
import numpy as np
import matplotlib.pyplot as plt

Creating the classes and defining the method to print the output

In [11]:
class State():
    def __init__(self, type: str, pos: tuple) -> None:
        self.pos = pos # (x,y)
        self.type = type
        self.terminal = False
        self.reward = None 
        self.actions = ['left', 'right', 'up', 'down'] 
        self.Q = {'left': 0, 'right': 0, 'up': 0, 'down': 0} 
        self.model = {'left': [], 'right': [], 'up': [], 'down': []} 

    def init_state(self):
        # starting cell  
        if self.type == 'start':
            self.reward = -10000
        # normal cell
        if self.type == 'cell':
            self.reward = -1
        # cliff cell -> gotta return back starting state 
        if self.type == 'cliff': 
            self.reward = -100
            self.terminal = True
        # goal cell
        if self.type == 'goal':
            self.reward = 20
            self.Q['left'] = 1
            self.terminal = True
        # snake cell 
        if self.type == 'snake':
            self.reward = -100
            #self.terminal = True does it terminate the episode?

    def policy(self, epsilon=0.1) -> str:

        # compute Q(s,a) value for all actions & get max 
        Q = [self.Q[a] for a in self.actions]
        max_qa = max(Q)
        max_qa_id = Q.index(max_qa)
        # selects action based on e-greedy 
        if random.random() < 1-epsilon:
            return self.actions[max_qa_id]
        else: 
            return random.choice(self.actions)


class CliffWorld:
    def __init__(self, size: tuple = (4,21)) -> None:
        # parameters
        self.lr = 0.4
        self.discount = 0.9 #put sizes
        self.rows = size[0]
        self.cols = size[1] 
        self.grid = [[None for j in range(self.cols)] for i in range(self.rows)]
        self.actions = ['left', 'right', 'up', 'down']
        self._init_world()
        
    def add_state(self, type: str, pos: tuple):
        # adds state to grid according to cell type 
        state = State(type, pos)
        state.init_state()
        x, y = pos[0], pos[1]
        self.grid[x][y] = state
    
    def _init_world(self): 
        # add cliff
        for i in range(1,20):
            self.add_state('cliff', (3,i))
        # add goal and snake
        self.add_state('start', (3,0))
        self.add_state('goal', (3,20))
        #self.add_state('snake', (0,11))
        # add cell states 
        for i, row in enumerate(self.grid):
            for j, val in enumerate(row): 
                if val is None:
                    self.add_state('cell', (i,j))
    

    def get_state(self, pos: tuple) -> State:
        x, y = pos[0], pos[1]
        return self.grid[x][y]
    
    def move_position(self, current_pos: tuple, action: str) -> tuple:
        # state = self.get_state(current_pos)
        x, y = current_pos[0], current_pos[1]
        # possible to pick any action, but if pick edges -> stay in spot & if pick wall -> stay in spot 
        if action == 'left':
            next_pos = (x, y-1)
        elif action == 'right':
            next_pos = (x, y+1)
        elif action == 'up':
            next_pos = (x-1, y)
        elif action == 'down':
            next_pos = (x+1, y)
        # check for boundary/wall conditions 
        if next_pos[0] < 0 or next_pos[0] > 3 or next_pos[1] < 0 or next_pos[1] > 20:
            return current_pos
        if self.get_state(next_pos).type == 'cliff':
            self.get_state((3,0)).terminal = True
            next_pos = (3,0)
            
        return next_pos
    
def show_policy(cw: CliffWorld):
    temp = np.zeros((cw.rows,cw.cols), str)
    for i in range(cw.rows):
        for j in range(cw.cols):
            cell = cw.get_state((i,j))
            max_item = max(cell.Q.items(), key=lambda x: x[1])
            if max_item[1] == 0:
                temp[i][j] ='0'
            elif max_item[1] == 1:
                temp[i][j] ='G'
            elif max_item[0] == 'right':
                temp[i][j] ='>'
            elif max_item[0] == 'left':
                temp[i][j] ='<'
            elif max_item[0] == 'down':
                temp[i][j] ='v'
            elif max_item[0] == 'up':
                temp[i][j] ='^'
       
    for i in range(cw.rows):
            print('-------------------------------------------------------------------------------------')
            out = '| '
            for j in range(cw.cols):
                out += temp[i][j] + ' | '
            print(out)
    print('-------------------------------------------------------------------------------------')

Defining the methods for the SARSA algorithm and Q-Learning 

In [21]:
def SARSA(grid_world, episodes=10000): 
    action_counter_total = []
    reward_counter_total = []
    # episodes  
    for _ in range(episodes):
        # initialize agent position at the start
        agent_pos = (3,0)
        
        # get state for the agent 
        s = grid_world.get_state(agent_pos)

        # -- 1 episode -- 
        action_counter = 0
        reward_counter = 0
        s.terminal = False
        while s.terminal == False:
          # get the policy for that s -> gives a 
            a = s.policy(0.05)
            # take an action -> arrive in s'
            next_pos = grid_world.move_position(agent_pos, a)
            s_prime = grid_world.get_state(next_pos)
            # get transition reward 
            r = s_prime.reward
            # get policy for s' -> gives a'
            a_prime = s_prime.policy(0.05)
            # Q(s,a) update 
            new_q_sa = s.Q[a] + grid_world.lr  * (r + (grid_world.discount * s_prime.Q[a_prime]) - s.Q[a])
            s.Q[a] = new_q_sa
            # transitions for next iteration 
            s, a, agent_pos = s_prime, a_prime, next_pos
            # counters 
            action_counter += 1
            reward_counter += r

        action_counter_total.append(action_counter)
        reward_counter_total.append(reward_counter)

    return grid_world, action_counter_total, reward_counter_total


def Q_learning(grid_world, episodes=10000):
    action_counter_total = []
    reward_counter_total = []
    # -- episodes -- 
    for _ in range(episodes):
        # initialize agent position 
        agent_pos = (3,0)
 
        # get state for the agent 
        s = grid_world.get_state(agent_pos)
        s.terminal = False
        # -- 1 episode -- 
        action_counter = 0
        reward_counter = 0
        while s.terminal == False:
            # get the policy for that s -> gives a 
            a = s.policy()
            # take an action -> arrive in s'
            next_pos = grid_world.move_position(agent_pos, a)
            s_prime = grid_world.get_state(next_pos)
            # get transition reward 
            r = s_prime.reward
            # get max Q(s, a')
            max_a_prime = max(s.Q.items(), key=lambda x: x[1])[0]
            # Q(s,a) update 
            new_q_sa = s.Q[a] + grid_world.lr  * (r + (grid_world.discount * s_prime.Q[max_a_prime]) - s.Q[a])
            s.Q[a] = new_q_sa
            # transitions for next iteration 
            s, agent_pos = s_prime, next_pos
            # counters
            action_counter += 1
            reward_counter += r
        action_counter_total.append(action_counter)
        reward_counter_total.append(reward_counter)

    return grid_world, action_counter_total, reward_counter_total

### Experiment 
1. SARSA with epsilon = 0.1, without the snake 

In [13]:
cw = CliffWorld()
cw, _, __ = SARSA(cw) # epsilon = 0.1
show_policy(cw)

-------------------------------------------------------------------------------------
| > | v | < | < | < | v | < | < | < | > | > | > | > | > | > | > | > | > | > | > | v | 
-------------------------------------------------------------------------------------
| > | ^ | ^ | ^ | > | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | v | 
-------------------------------------------------------------------------------------
| ^ | ^ | ^ | > | > | ^ | ^ | ^ | ^ | < | ^ | ^ | ^ | ^ | ^ | ^ | > | ^ | < | > | v | 
-------------------------------------------------------------------------------------
| ^ | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | G | 
-------------------------------------------------------------------------------------


1. Q-Learning 


In [20]:
cw = CliffWorld()
cw, _, __ = Q_learning(cw) 
show_policy(cw)

-------------------------------------------------------------------------------------
| > | > | > | > | > | > | > | v | > | > | > | > | > | > | v | v | > | v | v | > | v | 
-------------------------------------------------------------------------------------
| > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | v | 
-------------------------------------------------------------------------------------
| > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | > | v | 
-------------------------------------------------------------------------------------
| ^ | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | G | 
-------------------------------------------------------------------------------------


2. SARSA with epsilon = 0.05

In [23]:
cw = CliffWorld()
cw, _, __ = SARSA(cw) # epsilon = 0.0.5
show_policy(cw)

-------------------------------------------------------------------------------------
| > | > | > | > | > | > | > | ^ | > | > | > | > | > | > | > | > | > | > | > | > | v | 
-------------------------------------------------------------------------------------
| > | > | ^ | ^ | ^ | ^ | ^ | ^ | > | ^ | ^ | ^ | > | ^ | ^ | > | > | > | ^ | ^ | v | 
-------------------------------------------------------------------------------------
| ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | > | > | ^ | ^ | ^ | ^ | ^ | ^ | ^ | ^ | < | v | 
-------------------------------------------------------------------------------------
| ^ | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | G | 
-------------------------------------------------------------------------------------
