In [1]:
# This notebook is based on an exercise during the MachLe Course of Prof. Christoph Würsch. Please do not distribute.

In [11]:
import numpy as np
import matplotlib.pyplot as plt

In [12]:
BOARD_ROWS = 3
BOARD_COLS = 4
WIN_STATE = (0, BOARD_ROWS)
LOSE_STATE = (1, BOARD_ROWS)
START = (2, 0)
DETERMINISTIC = False
DO_PRINT = False

In [13]:
class Environment:
    def __init__(self, state=START):
        self.board = np.zeros([BOARD_ROWS, BOARD_COLS])
        self.board[1, 1] = -1
        self.state = state
        self.isEnd = False
        self.determine = DETERMINISTIC
        
    def giveReward(self):
        if self.state == WIN_STATE:
            return 1
        elif self.state == LOSE_STATE:
            return -1
        else:
            return 0
    
    def isEndFunc(self):
        if (self.state == WIN_STATE) or (self.state == LOSE_STATE):
            self.isEnd = True

    def _chooseActionProb(self, action):
        if action == "up":
            return np.random.choice(["up", "left", "right"], p=[0.8, 0.1, 0.1])
        if action == "down":
            return np.random.choice(["down", "left", "right"], p=[0.8, 0.1, 0.1])
        if action == "left":
            return np.random.choice(["left", "up", "down"], p=[0.8, 0.1, 0.1])
        if action == "right":
            return np.random.choice(["right", "up", "down"], p=[0.8, 0.1, 0.1])
        
    def nxtPosition(self, action):
    
        if self.determine:
            if action == "up":
                nxtState = (self.state[0]-1, self.state[1])
            elif action == "down":
                nxtState = (self.state[0]+1, self.state[1])
            elif action == "left":
                nxtState = (self.state[0], self.state[1]-1)
            else:
                nxtState = (self.state[0], self.state[1]+1)
            self.determine = False
        else:
            # non-deterministic
            action = self._chooseActionProb(action)
            self.determine = True
            nxtState = self.nxtPosition(action)
                        
        # if next state is legal
        if (nxtState[0] >= 0) and (nxtState[0] <= BOARD_ROWS-1):
            if (nxtState[1] >= 0) and (nxtState[1] <= BOARD_COLS-1):
                if nxtState != (1, 1):
                    return nxtState
        return self.state
    
    def showBoard(self):
        self.board[self.state] = 1
        for i in range(0, BOARD_ROWS):
            print('-----------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                if self.board[i, j] == 1:
                    token = '*'
                if self.board[i, j] == -1:
                    token = 'z'
                if self.board[i, j] == 0:
                    token = '0'
                out += token + ' | '
            print(out)
        print('-----------------')    

In [14]:
class Agent:
    
    def __init__(self):
        self.states = []  # record position and action taken at the position
        self.actions = ["up", "down", "left", "right"]
        self.State = Environment()
        self.isEnd = self.State.isEnd
        self.lr = 0.2
        self.exp_rate = 0.3
        self.decay_gamma = 0.9
        
        self.state_values = {}

        self.policy={}

        self.Q_values = {}
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                self.state_values[(i,j)]=0
                self.Q_values[(i, j)] = {}
                for a in self.actions:
                    self.Q_values[(i, j)][a] = 0  # Q value is a dict of dict  
    
    def chooseAction(self):
        # choose action with most expected value
        mx_nxt_reward = 0
        action = ""
        
        if np.random.uniform(0, 1) <= self.exp_rate:
            action = np.random.choice(self.actions)
        else:
            # greedy action
            for a in self.actions:
                current_position = self.State.state
                nxt_reward = self.Q_values[current_position][a]
                if nxt_reward >= mx_nxt_reward:
                    action = a
                    mx_nxt_reward = nxt_reward
        return action
    
    def takeAction(self, action):
        position = self.State.nxtPosition(action)
        # update State
        return Environment(state=position)     
    
    def reset(self):
        self.states = []
        self.State = Environment()
        self.isEnd = self.State.isEnd
    
    def play(self, rounds=10):
        i = 0
        while i < rounds:
            # to the end of game back propagate reward
            if self.State.isEnd:
                # back propagate
                reward = self.State.giveReward()
                for a in self.actions:
                    self.Q_values[self.State.state][a] = reward
                if DO_PRINT:
                    print("Game End Reward", reward)
                for s in reversed(self.states):
                    current_q_value = self.Q_values[s[0]][s[1]]
                    reward = current_q_value + self.lr*(self.decay_gamma*reward - current_q_value)
                    self.Q_values[s[0]][s[1]] = round(reward, 3)
                self.reset()
                i += 1
            else:
                action = self.chooseAction()
                # append trace
                self.states.append([(self.State.state), action])
                if DO_PRINT:
                    print("current position {} action {}".format(self.State.state, action))
                # by taking the action, it reaches the next state
                self.State = self.takeAction(action)
                # mark is end
                self.State.isEndFunc()
                if DO_PRINT:
                    print("nxt state", self.State.state)
                    print("---------------------")
                self.isEnd = self.State.isEnd
    
    def CalcValueFunction(self):
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                Q=np.zeros(4)
                for k,a in enumerate(self.actions):
                    Q[k]=self.Q_values[(i, j)][a]
                
                #following the optimum policy
                self.policy[(i,j)]=np.argmax(Q)
                self.state_values[(i,j)]=self.state_values[(i,j)]+np.max(Q)
    
    # value function V*(s)
    def showValues(self):
        self.CalcValueFunction()
        for i in range(0, BOARD_ROWS):
            print('-----------------------------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                out += "%+1.3f |" % self.state_values[(i, j)]
            print(out)
        print('-----------------------------------')
        
    # optimum policy pi*(s)    
    def showPolicy(self):
        self.CalcValueFunction()
        for i in range(0, BOARD_ROWS):
            print('-----------------------------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                myPolicy=self.actions[self.policy[(i,j)]]
                if myPolicy=='up':
                    direction=' ^  '
                if myPolicy=='down':
                    direction=' v  '
                if myPolicy=='left':
                    direction=' <--'
                if myPolicy=='right':
                    direction=' -->'   
                    
                out += "%s\t|" % direction
            print(out)
        print('-----------------------------------')
        
        

In [15]:
ag = Agent()
ag.reset()

In [16]:
ag.Q_values

{(0, 0): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (0, 1): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (0, 2): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (0, 3): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 0): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 1): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 2): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 3): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (2, 0): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (2, 1): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (2, 2): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (2, 3): {'up': 0, 'down': 0, 'left': 0, 'right': 0}}

In [17]:
ag.play(100)

In [18]:
ag.Q_values

{(0, 0): {'up': 0.335, 'down': 0.262, 'left': 0.325, 'right': 0.506},
 (0, 1): {'up': 0.488, 'down': 0.482, 'left': 0.324, 'right': 0.68},
 (0, 2): {'up': 0.481, 'down': 0.449, 'left': 0.459, 'right': 0.87},
 (0, 3): {'up': 1, 'down': 1, 'left': 1, 'right': 1},
 (1, 0): {'up': 0.401, 'down': 0.133, 'left': 0.304, 'right': 0.268},
 (1, 1): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 2): {'up': 0.382, 'down': 0.058, 'left': 0.323, 'right': -0.562},
 (1, 3): {'up': -1, 'down': -1, 'left': -1, 'right': -1},
 (2, 0): {'up': 0.323, 'down': 0.151, 'left': 0.169, 'right': 0.135},
 (2, 1): {'up': 0.023, 'down': 0.053, 'left': 0.211, 'right': -0.01},
 (2, 2): {'up': 0.185, 'down': 0.013, 'left': -0.004, 'right': -0.058},
 (2, 3): {'up': -0.18, 'down': -0.016, 'left': -0.002, 'right': -0.086}}

In [19]:
ag.showValues()

-----------------------------------
| +0.506 |+0.680 |+0.870 |+1.000 |
-----------------------------------
| +0.401 |+0.000 |+0.382 |-1.000 |
-----------------------------------
| +0.323 |+0.211 |+0.185 |-0.002 |
-----------------------------------


In [20]:
ag.state_values

{(0, 0): 0.506,
 (0, 1): 0.68,
 (0, 2): 0.87,
 (0, 3): 1.0,
 (1, 0): 0.401,
 (1, 1): 0.0,
 (1, 2): 0.382,
 (1, 3): -1.0,
 (2, 0): 0.323,
 (2, 1): 0.211,
 (2, 2): 0.185,
 (2, 3): -0.002}

In [21]:
ag.showPolicy()

-----------------------------------
|  -->	| -->	| -->	| ^  	|
-----------------------------------
|  ^  	| ^  	| ^  	| ^  	|
-----------------------------------
|  ^  	| <--	| ^  	| <--	|
-----------------------------------
