In [102]:
# Piero Pettenà -started from Zhang's version of normal grid world found here:
# https://github.com/MJeremy2017/reinforcement-learning-implementation/blob/master/GridWorld/gridWorld.py#L11C1-L18C39


In [103]:
import numpy as np
from PIL import Image
import sys

N_ROWS = 40
N_COLS = 40
IN_TARGET_POS = (3,3)
START = (N_ROWS//2, N_COLS//2)
TARGET_POS = IN_TARGET_POS
DATUM_POS = [4,4]
IN_BUDGET = 1000

# you need to implement ways of updating DATUM_POS and TARGET_POS wrt a flow of current

In [104]:
class State:
    def __init__(self, state=START):
        self.board = np.zeros([N_ROWS, N_COLS])
        self.board[*TARGET_POS] = 1
        self.isEnd = False
        self.position = state          #starting position when initializing state
        self.datum_pos = DATUM_POS
        self.datum_dist = 0
        self.distance = 0 
        
    def isEndFunc(self):
        if self.position == TARGET_POS:
            self.isEnd = True

    # returns x and y distance between current position and datum
    # position of datum is always known for simplicity
    def relativePos(self):
        dx = self.datum_pos[0]-self.position[0]
        dy = self.datum_pos[1]-self.position[1]
        dpos = np.sqrt(dx**2 + dy**2)

        return dx, dy, dpos


    #returns next position given an action
    def nxtPosition(self, action):
        """
        action: up, down, left, right
        -------------
        0 | 1 | 2| 3|
        1 |
        2 |
        return next position
        """
        
        if action == "up":
            nxtState = (self.position[0] - 1, self.position[1])
        elif action == "down":
            nxtState = (self.position[0] + 1, self.position[1])
        elif action == "left":
            nxtState = (self.position[0], self.position[1] - 1)
        else:
            nxtState = (self.position[0], self.position[1] + 1)
        # if next state legal
        if 0 <= nxtState[0] <= N_ROWS -1:       #this might give rise to errors
            if 0 <= nxtState[1] <= N_COLS -1:
                    return nxtState
        return self.position   # illegal new state, remain here

    def showBoard(self):
        self.board[self.position] = 1
        for i in range(0, N_ROWS):
            print('-----------------')
            out = '| '
            for j in range(0, N_COLS):
                if self.board[i, j] == 0:
                    token = '0'
                else:
                    token = '*'
                out += token + ' | '
            print(out)
        print('-----------------')


# Agent of player

class Agent:

    def __init__(self):
        self.states = []
        self.actions = ["up", "down", "left", "right"]
        self.state = State()
        self.lr = 0.2
        self.exp_rate = 0.3
        self.rewards = IN_BUDGET    # the idea is to remove 1 from initial rewards until reaching 0,
                                    # this is to simulate a "time out"
        # initial state reward
        # we are mapping states to values. Then we will pick the state with max value
        self.state_values = {}
        for i in range(N_ROWS):
            for j in range(N_COLS):
                self.state_values[(i, j)] = 0  # set initial value to 0
        self.stats = {'up': 0, 'down': 0, 'left': 0, 'right': 0}

    def chooseAction(self):
        # choose action with most expected value 
        ##############################################
        # NOTE: this might be giving the action 'right' many more opportunities because it is the last one in the actions vector

        mx_nxt_reward = 0
        action = ""

        # if np.random.uniform(0, 1) <= self.exp_rate:        #with probability rl choose to pick random action (uniformly)
            # action = np.random.choice(self.actions)
        # else:                                               #otherwise pick the action that corresponds to cell with max value
            # # greedy action
            # for a in self.actions:
                # # if the action is deterministic
                # nxt_reward = self.state_values[self.state.nxtPosition(a)]
                # if nxt_reward >= mx_nxt_reward:
                    # action = a
                    # mx_nxt_reward = nxt_reward
        action = np.random.choice(self.actions)
        return action

    def updateRewards(self):
        if self.state.position != TARGET_POS:
            self.rewards = self.rewards -1
        else:
            self.state.isEnd = True
            
    #update state and appends position to history of agent
    def takeAction(self, action):
        position = self.state.nxtPosition(action)
        self.state.position = position

    def reset(self):
        self.states = []
        self.state = State()

    def play(self):
        while self.rewards > 0 and not(self.state.isEnd):
            # to the end of game back propagate reward
            action = self.chooseAction()
            # append trace
            self.states.append(self.state.nxtPosition(action))          #maybe first position will be missing
            #print("current position {} action {}".format(self.state.position, action))
            # by taking the action, it reaches the next state
            self.takeAction(action)
            self.updateRewards()
            self.updateStats(action)
            # mark is end
            self.state.isEndFunc()
            #print("nxt state", self.state.position)
            #print("---------------------")

        if self.state.isEnd:
            # back propagate
            #reward = self.rewards
            # explicitly assign end state to reward values
            #self.state_values[self.state.position] = reward  # this is optional
            #print("Game End Reward", reward)
            print("Target found")
            #for s in reversed(self.states):
            #    reward = self.state_values[s] + self.lr * (reward - self.state_values[s])
            #     self.state_values[s] = round(reward, 3)
            # self.reset()

    def updateStats(self, action):
        self.stats[action] = self.stats[action] +1

    def showStats(self):
        print("Up \t Down \t Left \t Right")
        print(f"{self.stats['up']} \t {self.stats['down']} \t {self.stats['left']} \t {self.stats['right']}")

    def showValues(self):
        for i in range(0, N_ROWS):
            print('----------------------------------')
            out = '| '
            for j in range(0, N_COLS):
                out += str(self.state_values[(i, j)]).ljust(6) + ' | '
            print(out)
        print('----------------------------------')

    # saves path to png file
    def exportFig(self):
        path_fig = Image.new('RGB', (N_ROWS, N_COLS), 'black')
        path_fig.putpixel((TARGET_POS), (255, 0, 0))

        for x,y in self.states:
            path_fig.putpixel((x,y), (255,255,255))

        for x in range(N_ROWS):
            path_fig.putpixel((x, 0), (255, 83, 73))
            path_fig.putpixel((x, N_ROWS-1), (255, 83, 73))

        for y in range(N_COLS):
            path_fig.putpixel((0, y), (255, 83, 73))
            path_fig.putpixel((N_COLS-1, y), (255, 83, 73))

        path_fig.save('path_fig.png')


if __name__ == "__main__":
    ag = Agent()
    ag.play()
    ag.exportFig()
    ag.showStats()
    print(type(ag.state.position), type(TARGET_POS))

Up 	 Down 	 Left 	 Right
245 	 254 	 261 	 240
<class 'tuple'> <class 'tuple'>
