In [9]:
import numpy as np
import cv2
#configuration parameters
#startLoc should be changed from here
config = {
    "height": 30,
    "width": 30,
    "startLoc": [2, 4],
    "endLoc": [25, 18],
    #if you want to add new obstacles,then just add another topLeft and bottomRight corner dictionary to list
    #weight for the semi soft obstacle
    "ultraSoftWeight": -2,
    "ultraSoftObstacles": [
        {"topLeft": [7, 5], "bottomRight": [12, 10]},
        {"topLeft": [13, 6], "bottomRight": [14, 9]},
        {"topLeft": [15, 9], "bottomRight": [16, 11]},
        {"topLeft": [8, 11], "bottomRight": [10, 12]},
        {"topLeft": [17, 9], "bottomRight": [18, 17]},
        {"topLeft": [10, 13], "bottomRight": [12, 14]},
        {"topLeft": [12, 14], "bottomRight": [16, 17]},
    ],
    #weight for the semi soft obstacle
    "semiSoftWeight": -3,
    "semiSoftObstacles": [
        {"topLeft": [8, 6], "bottomRight": [11, 9]},
        {"topLeft": [12, 7], "bottomRight": [13, 10]},
        {"topLeft": [9, 10], "bottomRight": [10, 11]},
        {"topLeft": [14, 10], "bottomRight": [15, 11]},
        {"topLeft": [16, 12], "bottomRight": [17, 16]},
        {"topLeft": [11, 12], "bottomRight": [12, 13]},
        {"topLeft": [13, 14], "bottomRight": [15, 16]},
        
        {"topLeft": [11, 25], "bottomRight": [15, 27]},
        {"topLeft": [12, 23], "bottomRight": [19, 25]},
        {"topLeft": [13, 21], "bottomRight": [24, 24]},
        {"topLeft": [18, 20], "bottomRight": [24, 20]},
        {"topLeft": [20, 18], "bottomRight": [24, 19]},
    ],
    "softObstacles": [
        {"topLeft": [9, 7], "bottomRight": [10, 9]},
        {"topLeft": [11, 8], "bottomRight": [12, 8]}, 
        {"topLeft": [12, 9], "bottomRight": [12,10]},
        {"topLeft": [10, 10], "bottomRight": [10, 10]},
        {"topLeft": [11, 11], "bottomRight": [14, 11]},
        {"topLeft": [12, 12], "bottomRight": [15, 12]},
        {"topLeft": [13, 13], "bottomRight": [16, 13]},
        {"topLeft": [14, 14], "bottomRight": [16, 15]},
        
        {"topLeft": [12, 25], "bottomRight": [14, 26]},
        {"topLeft": [13, 23], "bottomRight": [14, 24]},
        {"topLeft": [14, 22], "bottomRight": [18, 24]},
        {"topLeft": [18, 21], "bottomRight": [23, 23]},
        {"topLeft": [21, 19], "bottomRight": [23, 20]},
    ],
    #weight for the soft obstacle
    "softWeight": -4,
    "hardObstacles": [
        {"topLeft": [9, 8], "bottomRight": [10, 8]},
        
        {"topLeft": [11, 9], "bottomRight": [11, 10]},
        {"topLeft": [12, 11], "bottomRight": [12, 11]},
        
        {"topLeft": [13, 12], "bottomRight": [14, 12]},
        
        {"topLeft": [15, 13], "bottomRight": [15, 14]},
        {"topLeft": [13, 25], "bottomRight": [13, 25]}, 
        
        {"topLeft": [14, 23], "bottomRight": [14, 24]},
        {"topLeft": [14, 23], "bottomRight": [17, 23]},
        
        {"topLeft": [18, 22], "bottomRight": [18, 23]},
        {"topLeft": [19, 22], "bottomRight": [22, 22]},
        
        {"topLeft": [22, 20], "bottomRight": [22, 22]},
#         {"topLeft": [15, 24], "bottomRight": [14, 25]}
        
    ],
    #weight for the hard obstacle
    "hardWeight": -1000,
    "reward": 1000000       
}
#class that finds the optimal path
class PathFinder:
    #initializing the environment
    def __init__(self, config):        
        self.environment_rows = config['height']
        self.environment_columns = config['width'] 
        #initializing qValues
        #our goal is to learn this values from many experimental runs
        self.q_values = np.zeros((self.environment_rows, self.environment_columns, 4))
        #set of possible actions
        self.actions = ['up', 'right', 'down', 'left']
        #rewards matrix(set to -1 at start)
        self.rewards = np.full((self.environment_rows, self.environment_columns), -1)
        #initializing ultraSoftObstacle with ultraSoftWeights
        for i in config["ultraSoftObstacles"]:
            self.rewards[i["topLeft"][0]:i["bottomRight"][0]+1, i["topLeft"][1]:i["bottomRight"][1]+1] = config["ultraSoftWeight"]
        #initializing semiSoftObstacle with semiSoftWeights
        for i in config["semiSoftObstacles"]:
            self.rewards[i["topLeft"][0]:i["bottomRight"][0]+1, i["topLeft"][1]:i["bottomRight"][1]+1] = config["semiSoftWeight"]
        #initializing softObstacle with softWeights
        for i in config["softObstacles"]:
            self.rewards[i["topLeft"][0]:i["bottomRight"][0]+1, i["topLeft"][1]:i["bottomRight"][1]+1] = config["softWeight"]
        #initializing hardObstacles with hardWeights
        for i in config["hardObstacles"]:
            self.rewards[i["topLeft"][0]:i["bottomRight"][0]+1, i["topLeft"][1]:i["bottomRight"][1]+1] = config["hardWeight"]  
        #will raise an error if startpoint is on hard obstacle
        assert self.rewards[config["startLoc"][0],config["startLoc"][1]] != config["hardWeight"], "Start point can not be inside hard obstacles!"
        #initializing startLocation and reward for getting to it
        self.rewards[config["startLoc"][0],config["startLoc"][1]] = config['reward']
        #print(np.array(self.rewards, dtype=int))
    #checking if experiment must stop
    #stop conditions 1.crushed into hard obstacle
                   # 2.reached startLoc
    def is_terminal_state(self, current_row_index, current_column_index):
        return (self.rewards[current_row_index, current_column_index] == config["hardWeight"]) or ((self.rewards[current_row_index, current_column_index] == config['reward']))
    #agent plays many games starting from random position,trying to maximize his reward(reach startLoc)
    #this function initializes the agent in valid place
    def get_starting_location(self):
        current_row_index = np.random.randint(self.environment_rows)
        current_column_index = np.random.randint(self.environment_columns)
        while self.is_terminal_state(current_row_index, current_column_index):
            current_row_index = np.random.randint(self.environment_rows)
            current_column_index = np.random.randint(self.environment_columns)
        return current_row_index, current_column_index
    #function that chooses next action from given [row,column] state
    #with probability 100epsilon it chooses best action
    #so epsilon is for adding some randomness to our agent
    def get_next_action(self, current_row_index, current_column_index, epsilon):
        if np.random.random() < epsilon:
            return np.argmax(self.q_values[current_row_index, current_column_index])
        else:
            return np.random.randint(4)   
    #once the action is chosen,you need to know the location after making that action
    def get_next_location(self, current_row_index, current_column_index, action_index):
        new_row_index = current_row_index
        new_column_index = current_column_index
        if self.actions[action_index] == 'up' and current_row_index > 0:
            new_row_index -= 1
        elif self.actions[action_index] == 'right' and current_column_index < self.environment_columns - 1:
            new_column_index += 1
        elif self.actions[action_index] == 'down' and current_row_index < self.environment_rows - 1:
            new_row_index += 1
        elif self.actions[action_index] == 'left' and current_column_index > 0:
            new_column_index -= 1
        return new_row_index, new_column_index
    #AFTER TRAINING
    #accepts the endLocation,and returns the shortest path to that location from startLoc
    #once you train the agent,you can call this function(without training again),with different endLocation
    #if you want to change the startLocation,you must train agent again
    def get_shortest_path(self, start_row_index, start_column_index):
        shortest_path = []
        if self.is_terminal_state(start_row_index, start_column_index):
            return shortest_path
        else:
            current_row_index, current_column_index = start_row_index, start_column_index
            shortest_path.append([current_row_index, current_column_index])
        while not self.is_terminal_state(current_row_index, current_column_index):
            action_index = self.get_next_action(current_row_index, current_column_index, 1.)
            #print(action_index)
            current_row_index, current_column_index = self.get_next_location(current_row_index, current_column_index, action_index)
            shortest_path.append([current_row_index, current_column_index])
        return shortest_path

    #training function
    #parameters-epoch(number of epoch to train our agent)
    def train(self, epoch):
        #hyperparameters used in Bellman equation for computing Qvalues
        epsilon = 0.8
        discount_factor = 0.99 
        learning_rate = 0.4

        for episode in range(epoch):
            row_index, column_index = self.get_starting_location()
            
            while not self.is_terminal_state(row_index, column_index):
                #while experimental run is not finished,update Qtable according to Bellman equation
                action_index = self.get_next_action(row_index, column_index, epsilon)

                old_row_index, old_column_index = row_index, column_index 
                row_index, column_index = self.get_next_location(row_index, column_index, action_index)

                reward = self.rewards[row_index, column_index]
                old_q_value = self.q_values[old_row_index, old_column_index, action_index]
                temporal_difference = reward + (discount_factor * np.max(self.q_values[row_index, column_index])) - old_q_value

                new_q_value = old_q_value + (learning_rate * temporal_difference)
                self.q_values[old_row_index, old_column_index, action_index] = new_q_value

#function to visualize given path
def visualize_(boardSize, path, start, end):
    
    rows = boardSize[0] * 20
    cols = boardSize[1] * 20
    img = np.zeros((cols + 1, rows + 1, 3), dtype = "uint8") #3channel

    stepX = int(rows / 20) + 1
#     step=75
    stepY = int(cols / 20) + 1
    x = np.linspace(start = 0, stop = rows, num = stepX)
    y = np.linspace(start = 0, stop = cols, num = stepY)

    v_xy = []
    h_xy = []
    for i in range(stepX):
        v_xy.append([int(x[i]), 0, int(x[i]), cols-1])
    for i in range(stepY): 
        h_xy.append([0, int(y[i]), rows-1, int(y[i])])


    for i in range(stepX):
        [x1, y1, x2, y2] = v_xy[i]
        cv2.line(img, (x1,y1), (x2, y2), (255,255,255), 1)
        
    for i in range(stepY):
        [x1_, y1_, x2_, y2_] = h_xy[i]
        cv2.line(img, (x1_,y1_), (x2_ + 1, y2_), (255,255,255), 1)

    for obs in config['ultraSoftObstacles']:
        cv2.rectangle(img,(obs['topLeft'][1]*20, obs['topLeft'][0]*20),((obs['bottomRight'][1]+1)*20, (obs['bottomRight'][0]+1)*20), (100,50,0),2)

    for obs in config['semiSoftObstacles']:
        cv2.rectangle(img,(obs['topLeft'][1]*20, obs['topLeft'][0]*20),((obs['bottomRight'][1]+1)*20, (obs['bottomRight'][0]+1)*20), (100,100,0),2)

    for obs in config['softObstacles']:
        cv2.rectangle(img,(obs['topLeft'][1]*20, obs['topLeft'][0]*20),((obs['bottomRight'][1]+1)*20, (obs['bottomRight'][0]+1)*20), (100,0,100),2)

    for obs in config['hardObstacles']:
        cv2.rectangle(img,(obs['topLeft'][1]*20, obs['topLeft'][0]*20),((obs['bottomRight'][1]+1)*20, (obs['bottomRight'][0]+1)*20), (255,0,255),2)
        
    # for i, p in enumerate(path):
    #     print(p)
    color = (255,0,0)
    for ind in range(len(path) - 1):
        cv2.line(img, (path[ind][1]*20 + 10,path[ind][0]*20 + 10), (path[ind + 1][1]*20 + 10, path[ind + 1][0]*20 + 10), color, 3)
    
    # for start in startPoints:
    cv2.circle(img,(start[1]*20+10,start[0]*20+10), 3, (0,255,255), -1)
    # for end in endPoints:
    cv2.circle(img,(end[1]*20+10,end[0]*20+10), 3, (255,0,255), -1)
        
    cv2.namedWindow('Optimal path', cv2.WINDOW_NORMAL)
    cv2.imshow('Optimal path', img)
    cv2.waitKey(0)

#for dynamic ploting
#press any key(e.g. enter) for next step
def visualize(path):
    for p in range(len(path),-1,-1):
        visualize_((config['height'], config['width']), path[p:], config['startLoc'], config['endLoc'])
    cv2.destroyAllWindows()
                
try:
    short = PathFinder(config)
    #trains for 10000 experimental runs(epochs)
    short.train(10000)
except AssertionError as e:
    print(e)
    

In [10]:
#returns shortestPath from startLoc(given in config to endLoc also given in config)
#if you want to try for differents startLoc,you need to rerun whole thing again
#if you want to test for different endPoint you can run this 3 cells(including uncommented one) separately
# config['endLoc'] = [0,9]
path = short.get_shortest_path(config['endLoc'][0], config['endLoc'][1])
visualize(path)

In [None]:
import numpy as np
import cv2
#configuration parameters
#startLoc should be changed from here
config = {
    "height": 30,
    "width": 30,
    "startLoc": [2, 4],
    "endLoc": [25, 25],
    #if you want to add new obstacles,then just add another topLeft and bottomRight corner dictionary to list
    "softObstacles": [
        {"topLeft": [9, 7], "bottomRight": [10, 9]},
        {"topLeft": [11, 8], "bottomRight": [12, 8]}, 
        {"topLeft": [12, 9], "bottomRight": [12,10]},
        {"topLeft": [10, 10], "bottomRight": [10, 10]},
        {"topLeft": [11, 11], "bottomRight": [14, 11]},
        {"topLeft": [12, 12], "bottomRight": [15, 12]},
        {"topLeft": [13, 13], "bottomRight": [16, 13]},
        {"topLeft": [14, 14], "bottomRight": [16, 15]},
        
        {"topLeft": [12, 25], "bottomRight": [14, 26]},
        {"topLeft": [13, 23], "bottomRight": [14, 24]},
        {"topLeft": [15, 22], "bottomRight": [18, 24]},
        {"topLeft": [18, 21], "bottomRight": [23, 23]},
        {"topLeft": [21, 19], "bottomRight": [23, 20]},
    ],
    #weight for the soft obstacle
    "softWeight": -2,
    "hardObstacles": [
        {"topLeft": [9, 8], "bottomRight": [10, 8]},
        
        {"topLeft": [11, 9], "bottomRight": [11, 10]},
        {"topLeft": [12, 11], "bottomRight": [12, 11]},
        
        {"topLeft": [13, 12], "bottomRight": [14, 12]},
        
        {"topLeft": [15, 13], "bottomRight": [15, 14]},
        {"topLeft": [13, 25], "bottomRight": [13, 25]}, 
        
        {"topLeft": [14, 23], "bottomRight": [14, 24]},
        {"topLeft": [15, 23], "bottomRight": [17, 23]},
        
        {"topLeft": [18, 22], "bottomRight": [18, 23]},
        {"topLeft": [19, 22], "bottomRight": [22, 22]},
        
        {"topLeft": [22, 20], "bottomRight": [22, 22]},
#         {"topLeft": [15, 24], "bottomRight": [14, 25]}
        
    ],
    #weight for the hard obstacle
    "hardWeight": -1000,
    "reward": 1000000       
}

In [None]:
environment_rows = config['height']
environment_columns = config['width'] 
        #initializing qValues
        #our goal is to learn this values from many experimental runs
q_values = np.zeros((environment_rows, environment_columns, 4))
        #set of possible actions
actions = ['up', 'right', 'down', 'left']
        #rewards matrix(set to -1 at start)
rewards = np.full((environment_rows, environment_columns), -1)
        #initializing softObstacle with softWeights
for i in config["softObstacles"]:
    rewards[i["topLeft"][0]:i["bottomRight"][0]+1, i["topLeft"][1]:i["bottomRight"][1]+1] = config["softWeight"]
        #initializing hardObstacles with hardWeights
for i in config["hardObstacles"]:
    rewards[i["topLeft"][0]:i["bottomRight"][0]+1, i["topLeft"][1]:i["bottomRight"][1]+1] = config["hardWeight"]  

In [None]:
rewards