# Machine Learning Engineer Nanodegree

## Capstone Project - Code Base

Mathias von Kaiz  
August 1st, 2017

**This code base is to evaluate functions, algorithms for developing the final Robot class.**

In [1]:
from maze import Maze
#from robot import Robot
import sys
import numpy as np
import random
import copy

In [2]:
dir_mazes = {'maze1': 'test_maze_01.txt',
            'maze2': 'test_maze_02.txt',
            'maze3': 'test_maze_01.txt'}
testmaze = Maze( str(dir_mazes['maze1']) )


#####################

_init = [0, 0]
_distance = [0, 1, 2, 3]

#####################
_steering = ['left', 'front', 'right']
_dir_steering_to_rotation = {'left': -90, 'front': 0, 'right': 90}
_dir_steering_to_sensor_index = {'left': 0, 'front': 1, 'right': 2}

_dir_map_value = {'N': 1, 'E': 2, 'S': 4 , 'W': 8 }

_heading = ['u', 'r', 'd', 'l']
_dir_heading_to_map = {'u': 'N', 'r': 'E', 'd': 'S', 'l': 'W',
                       'up': 'N', 'right': 'E', 'down': 'S', 'left': 'W'}
_dir_int = {'u': 1, 'r': 2, 'd': 4, 'l': 8,
            'up': 1, 'right': 2, 'down': 4, 'left': 8}

_dir_heading_to_symbol = {'u': '^', 'r': '>', 'd': 'v', 'l': '<',
                          'up': '^', 'right': '>', 'down': 'v', 'left': '<'}


_dir_sensors = {'u': ['l', 'u', 'r'], 'r': ['u', 'r', 'd'],
               'd': ['r', 'd', 'l'], 'l': ['d', 'l', 'u'],
               'up': ['l', 'u', 'r'], 'right': ['u', 'r', 'd'],
               'down': ['r', 'd', 'l'], 'left': ['d', 'l', 'u']}
_dir_move = {'u': [0, 1], 'r': [1, 0], 'd': [0, -1], 'l': [-1, 0],
            'up': [0, 1], 'right': [1, 0], 'down': [0, -1], 'left': [-1, 0]}
_dir_reverse = {'u': 'd', 'r': 'l', 'd': 'u', 'l': 'r',
               'up': 'd', 'right': 'l', 'down': 'u', 'left': 'r'}





In [3]:
class Map(object):
    def __init__(self, map_dim):
        '''
        Use the initialization function to set up attributes
        '''

        self.map_dim = map_dim
        self.grid = [[-1 for col in range(map_dim)] for row in range(map_dim)]
        self.visited = [[-1 for col in range(map_dim)] for row in range(map_dim)]
        self.moved = [[' ' for col in range(map_dim)] for row in range(map_dim)]
        
        
        
    def reset(self):
        self.visited = [[-1 for col in range(self.map_dim)] for row in range(self.map_dim)]
        self.moved = [[' ' for col in range(self.map_dim)] for row in range(self.map_dim)]
        
    def is_permissible(self, cell, direction):
        """
        Returns a boolean designating whether or not a cell is passable in the
        given direction. Cell is input as a list. Directions may be
        input as single letter 'u', 'r', 'd', 'l', or complete words 'up', 
        'right', 'down', 'left'.
        """
        if self.is_unknown(cell):
            return False
        
        try:
            return (self.grid[cell[0]][cell[1]] & _dir_int[direction] != 0)
        except:
            print cell, direction, ' is a wall!'
           
        
        
    def is_unknown(self, cell):
        return self.grid[cell[0]][cell[1]] == -1
        
        
        
    def expand(self, location, heading, sensors):
        value = 0
        # TODO refactor
        # check sensor steering
        for direction in _steering:
            if sensors[_dir_steering_to_sensor_index[direction]] > 0:
                i = _dir_map_value[_dir_heading_to_map[_dir_sensors[heading][_dir_steering_to_sensor_index[direction]]]]
                value += i
                
        # map reverse if not at starting position
        # TODO: refactor fo better way
        if (location != _init):
            value += _dir_map_value[_dir_heading_to_map[_dir_reverse[heading]]]
        
        # set value
        if self.grid[location[0]][location[1]] == -1:
            self.grid[location[0]][location[1]] = value
                

In [140]:
class Plan(object):
    # --------
    # init: 
    #    creates an empty plan
    #
    def __init__(self, robot, cost = 1):
        '''
        Use the initialization function to set up attributes
        '''
        self.robot = robot
        self.map = robot.map
        self.grid_size = len(robot.map.grid[0])
        
        self.start = robot.get_start()
        self.init = robot.location
        self.set_goal()
        
        self.is_exploring = True
        self.is_goal_reached = False
        self.is_looking_for_start = False
        
        self.cost = cost
        self.make_heuristic()
        
        self.path = []
        self.policy = []
        self.expand = []
        
    def reset(self):
        self.start = self.robot.get_start()
        self.init = self.robot.location
        self.set_goal()
        
        self.is_exploring = False
        self.is_goal_reached = False
        self.is_looking_for_start = False
        
        self.path = []
        self.policy = []
        self.expand = []
        
    # --------
    #
    # calculate goal function for a grid
    
    def set_goal(self, goal_type='Goal'):
        if goal_type == 'Goal':
            self.goal = [self.grid_size/2 - 1, self.grid_size/2]
        elif goal_type == 'Start':
            self.goal = self.robot.start
            
    
    def set_goal_reached(self):
        self.is_goal_reached = self.robot.location[0] in self.goal and self.robot.location[1] in self.goal
    
    def set_steering(self, idx = 0):
        steering = _steering[0]
        for direction in _steering:
                if _dir_sensors[self.robot.heading][_dir_steering_to_sensor_index[direction]] == self.path[idx]:
                    steering = direction
                    
        return steering
    
    # --------
    #
    # make heuristic function for a grid
        
    def make_heuristic(self):
        self.heuristic = [[0 for row in range(self.grid_size)] 
                          for col in range(self.grid_size)]
        
        for i in range(self.grid_size):    
            for j in range(self.grid_size):
                self.heuristic[i][j] = abs(i - self.goal[0]) + abs(j - self.goal[1])
                    
                    
    # --------
    #
    # Random search for goal
    
    def random(self):
        steering = random.choice(_steering) 
        movement = random.choice(_distance)
        
        return steering, movement
    
    
    # --------
    #
    # A* search for goal
    def astar_search(self, init):
        # Init arrays
        closed = [[0 for col in range(self.grid_size)] for row in range(self.grid_size)]
        closed[init[0]][init[1]] = 1
        expand = [[-1 for col in range(self.grid_size)] for row in range(self.grid_size)]
        action = [[-1 for col in range(self.grid_size)] for row in range(self.grid_size)]

        # Init parameters
        x = init[0]
        y = init[1]
        g = 0
        h = self.heuristic[x][y]
        f = g + h

        found = False
        resign = False
        count = 0
        
        # Set open on beginning of algorithm
        open = [[f, h, g, x, y]]

        # Loop while goal is not found or stuck
        while not found and not resign:
            
            # Goal found
            if len(open) == 0:
                resign = True
                print ("Failed as not path found")
            
            # Goal not yet found
            else:
                open.sort()
                open.reverse()
                next = open.pop()
                x = next[3]
                y = next[4]
                g = next[2]

                expand[x][y] = count
                count += 1

                # check if we are done
                if x == self.goal[0] and y == self.goal[1]:
                    found = True

                else:
                    for i in range(len(_heading)):
                        heading = _dir_move[_heading[i]]

                        x2 = x
                        y2 = y

                        # check if we can proceed or if unknown
                        if self.map.is_permissible([x2, y2], _heading[i]) or self.map.is_unknown([x2, y2]):
                            x2 = x + heading[0]
                            y2 = y + heading[1]
                            
                            if  x2 >= 0 and x2 < self.grid_size and y2 >= 0 and y2 < self.grid_size:
                                if closed[x2][y2] == 0:
                                    g2 = g + self.cost
                                    h2 = self.heuristic[x2][y2]
                                    f2 = g2 + h2

                                    open.append([f2, h2, g2, x2, y2])
                                    closed[x2][y2] = 1
                                    action[x2][y2] = i
        
        
        # Set optimal policy
        policy = [[' ' for col in range(self.grid_size)] for row in range(self.grid_size)]
        x = self.goal[0]
        y = self.goal[1]
        policy[x][y] = '*'
        
        # Set optimal path
        invpath = []
        while x != init[0] or y != init[1]:
            x2 = x - _dir_move[_heading[action[x][y]]][0]
            y2 = y - _dir_move[_heading[action[x][y]]][1]
            policy[x2][y2] = _dir_heading_to_symbol[_heading[action[x][y]]]
            invpath.append(_heading[action[x][y]])
            x = x2
            y = y2
            
        path = []
        for i in range(len(invpath)):
            path.append(invpath[len(invpath) - 1 - i])
            
        # Return path, policy and expand arrays
        return path, policy, expand
    
    
    # --------
    #
    # A*
    def astar(self, path=False):
        
        if self.heuristic == []:
            raise ValueError, "Heuristic must be defined to run A*"
        
        # perform a* search
        self.path, self.policy, self.expand = self.astar_search(self.init)
        
        # set steering
        if len(self.path) != 0:   
            steering = self.set_steering()
        
        # differentiate between path searching or next steering
        if path:
            # return optimal path and policy
            return self.path, self.policy
        else:
            # return steering direction and movement 1
            return steering, 1
    
    
    # --------
    #
    # Dynamic pogramming search for goal
    def dp_search(self, init):
        value = [[999 for col in range(self.grid_size)] for col in range(self.grid_size)]
        action = [[-1 for col in range(self.grid_size)] for row in range(self.grid_size)]
        
#         policy = [[' ' for col in range(self.grid_size)] for row in range(self.grid_size)]
#         policy[self.goal[0]][self.goal[1]] = '*'
    
        change = True
        while change:
            change = False

            for x in range(self.grid_size):
                for y in range(self.grid_size):

                    if x == self.goal[0] and y == self.goal[1]:
                        if value[x][y] > 0:
                            value[x][y] = 0
                            change = True

                    else:
                        for i in range(len(_heading)):
                            heading = _dir_move[_heading[i]]

                            x2 = x
                            y2 = y

                            # check if we can proceed or if unknown
                            if self.map.is_permissible([x2, y2], _heading[i]):
                                x2 = x + heading[0]
                                y2 = y + heading[1]

                                if  x2 >= 0 and x2 < self.grid_size and y2 >= 0 and y2 < self.grid_size:
                                    v2 = value[x2][y2] + self.cost
                                    
                                    if v2 < value[x][y]:
                                        change = True
                                        value[x][y] = v2
#                                         policy[x][y] = _dir_heading_to_symbol[_heading[i]]
                                        action[x][y] = i

        # Set optimal policy
        policy = [[' ' for col in range(self.grid_size)] for row in range(self.grid_size)]
        x = self.goal[0]
        y = self.goal[1]
        policy[x][y] = '*'
        
        # Set optimal path
        invpath = []
        while x != init[0] or y != init[1]:
            print(x, y)
            print(np.rot90(action))
            x2 = x - _dir_move[_heading[action[x][y]]][0]
            y2 = y - _dir_move[_heading[action[x][y]]][1]
            
            if action[x][y] >= 0:
                print(action[x][y])
                print(_dir_heading_to_symbol[_heading[action[x][y]]])
                policy[x2][y2] = _dir_heading_to_symbol[_heading[action[x][y]]]
                
                invpath.append(_heading[action[x][y]])
            else:
                pass
                #x2 = x
                #y2 = y
            
            x = x2
            y = y2
            
        path = []
        for i in range(len(invpath)):
            path.append(invpath[len(invpath) - 1 - i])

        # make sure your function returns a grid of values as 
        # demonstrated in the previous video.
        
        return path, policy

    # --------
    #
    # Dynamic Programming
    def dp(self, path=False):
        
        # perform dynamic programming search
        self.path, self.policy = self.dp_search(self.init)
        
        # set steering
        if len(self.path) != 0:   
            steering = self.set_steering()
        
        # differentiate between path searching or next steering
        if path:
            # return optimal path and policy
            return self.path, self.policy
        else:
            # return steering direction and movement 1
            return steering, 1

In [141]:
class Robot(object):
    def __init__(self, map_dim):
        '''
        Use the initialization function to set up attributes that your robot
        will use to learn and navigate the maze. Some initial attributes are
        provided based on common information, including the size of the maze
        the robot is placed in.
        '''
        self.start = [0, 0]
        self.location = self.get_start()
        self.heading = 'up'
        
        self.map = Map(map_dim)
        self.plan = Plan(self)
        
        self.timespan = 100
        self.timestep = 0
        self.max_timestep = 1000
        
    def reset(self):
        self.location = self.get_start()
        self.heading = 'up'
        self.timestep = 0
            
    
    def set_heading(self, steering):
        self.heading = _dir_sensors[self.heading][_dir_steering_to_sensor_index[steering]]
        
    def get_start(self):
        return copy.copy(self.start)
    
    

    def next_move(self, sensors):
        '''
        Use this function to determine the next move the robot should make,
        based on the input from the sensors after its previous move. Sensor
        inputs are a list of three distances from the robot's left, front, and
        right-facing sensors, in that order.

        Outputs should be a tuple of two values. The first value indicates
        robot rotation (if any), as a number: 0 for no rotation, +90 for a
        90-degree rotation clockwise, and -90 for a 90-degree rotation
        counterclockwise. Other values will result in no rotation. The second
        value indicates robot movement, and the robot will attempt to move the
        number of indicated squares: a positive number indicates forwards
        movement, while a negative number indicates backwards movement. The
        robot may move a maximum of three units per turn. Any excess movement
        is ignored.

        If the robot wants to end a run (e.g. during the first training run in
        the maze) then returing the tuple ('Reset', 'Reset') will indicate to
        the tester to end the run and return the robot to the start.
        '''

        rotation = 0
        movement = 0
        
        # for report purposes
        old_location = [self.location[0], self.location[1]]
        
        # map surrounding
        self.map.expand(self.location, self.heading, sensors)
        
        # set visited
        self.map.visited[self.location[0]][self.location[1]] = 1
        
        # check if time is elapsed
        if self.timestep == self.max_timestep - self.timespan:
            # Debug Reports
            print (np.rot90(self.map.moved))
            #print (np.rot90(self.map.grid))
            #print (np.rot90(self.plan.expand))
            #print (np.rot90(self.plan.map.grid))
            
            # Reset everything
            self.reset()
            self.map.reset()
            self.plan.reset()
            
            # different search path algorithms
#             self.plan.path, self.plan.policy = self.plan.astar(True)
            self.plan.path, self.plan.policy = self.plan.dp(True)
            
            # Debug Reports
            print (np.rot90(self.plan.policy))
            
            # Reset run
            return ('Reset', 'Reset')    
        
        # check for exploration mode
        if self.plan.is_exploring:
            # check for goal
            self.plan.set_goal_reached()
            
            if self.plan.is_goal_reached:
                # explore back to start in case it is not already doing so
                if not self.plan.is_looking_for_start:
                    self.plan.set_goal('Start')
                    self.plan.is_looking_for_start = True
                else:
                    self.plan.set_goal()
                    self.plan.is_looking_for_start = False
            
            # different explore algorithms
#            steering, movement = self.plan.random()
            steering, movement = self.plan.astar()
        else:
            # walk by given path
            steering, movement = self.plan.set_steering(self.timestep), 1
        
        
        # perform rotation and set new heading
        rotation = _dir_steering_to_rotation[steering]
        self.set_heading(steering)
              
        # perform movement if possible - TODO refactor
        if movement > 0:
            if  movement <= sensors[_dir_steering_to_sensor_index[steering]]:
                # perform movement
                movement_steps = movement
                while movement_steps:
                    # map movement
                    self.map.moved[self.location[0]][self.location[1]] = _dir_heading_to_symbol[self.heading]
                    
                    # perform move
                    self.location[0] += _dir_move[self.heading][0]
                    self.location[1] += _dir_move[self.heading][1]
                    movement_steps -= 1
            else:
                movement = 0
               
        # Debug Report
        print '{} [{:>2d},{:>2d},{:>2d}] {:>3d} => {}, {:>2d} steps = {}'.format(
            old_location,
            sensors[0], sensors[1], sensors[2],
            rotation,
            self.heading,
            movement,
            self.location)
        
        # increase time
        self.timestep += 1
        
        # return actions
        return rotation, movement

In [139]:
dir_sensors = {'u': ['l', 'u', 'r'], 'r': ['u', 'r', 'd'],
               'd': ['r', 'd', 'l'], 'l': ['d', 'l', 'u'],
               'up': ['l', 'u', 'r'], 'right': ['u', 'r', 'd'],
               'down': ['r', 'd', 'l'], 'left': ['d', 'l', 'u']}
dir_move = {'u': [0, 1], 'r': [1, 0], 'd': [0, -1], 'l': [-1, 0],
            'up': [0, 1], 'right': [1, 0], 'down': [0, -1], 'left': [-1, 0]}
dir_reverse = {'u': 'd', 'r': 'l', 'd': 'u', 'l': 'r',
               'up': 'd', 'right': 'l', 'down': 'u', 'left': 'r'}

# test and score parameters
max_time = 1000
train_score_mult = 1/30.

if __name__ == '__main__':
    '''
    This script tests a robot based on the code in robot.py on a maze given
    as an argument when running the script.
    '''

    # Create a maze based on input argument on command line.
    testmaze = Maze( str(dir_mazes['maze1']) )

    # Intitialize a robot; robot receives info about maze dimensions.
    testrobot = Robot(testmaze.dim)

    # Record robot performance over two runs.
    runtimes = []
    total_time = 0
    for run in range(2):
        print "Starting run {}.".format(run)

        # Set the robot in the start position. Note that robot position
        # parameters are independent of the robot itself.
        robot_pos = {'location': [0, 0], 'heading': 'up'}

        run_active = True
        hit_goal = False
        while run_active:
            # check for end of time
            total_time += 1
            if total_time > max_time:
                run_active = False
                print "Allotted time exceeded."
                break

            # provide robot with sensor information, get actions
            sensing = [testmaze.dist_to_wall(robot_pos['location'], heading)
                       for heading in dir_sensors[robot_pos['heading']]]
            rotation, movement = testrobot.next_move(sensing)

            # check for a reset
            if (rotation, movement) == ('Reset', 'Reset'):
                if run == 0 and hit_goal:
                    run_active = False
                    runtimes.append(total_time)
                    print "Ending first run. Starting next run."
                    break
                elif run == 0 and not hit_goal:
                    print "Cannot reset - robot has not hit goal yet."
                    continue
                else:
                    print "Cannot reset on runs after the first."
                    continue

            # perform rotation
            if rotation == -90:
                robot_pos['heading'] = dir_sensors[robot_pos['heading']][0]
            elif rotation == 90:
                robot_pos['heading'] = dir_sensors[robot_pos['heading']][2]
            elif rotation == 0:
                pass
            else:
                print "Invalid rotation value, no rotation performed."

            # perform movement
            if abs(movement) > 3:
                print "Movement limited to three squares in a turn."
            movement = max(min(int(movement), 3), -3) # fix to range [-3, 3]
            while movement:
                if movement > 0:
                    if testmaze.is_permissible(robot_pos['location'], robot_pos['heading']):
                        robot_pos['location'][0] += dir_move[robot_pos['heading']][0]
                        robot_pos['location'][1] += dir_move[robot_pos['heading']][1]
                        movement -= 1
                    else:
                        print "Movement stopped by wall."
                        movement = 0
                else:
                    rev_heading = dir_reverse[robot_pos['heading']]
                    if testmaze.is_permissible(robot_pos['location'], rev_heading):
                        robot_pos['location'][0] += dir_move[rev_heading][0]
                        robot_pos['location'][1] += dir_move[rev_heading][1]
                        movement += 1
                    else:
                        print "Movement stopped by wall."
                        movement = 0

            # check for goal entered
            goal_bounds = [testmaze.dim/2 - 1, testmaze.dim/2]
            if robot_pos['location'][0] in goal_bounds and robot_pos['location'][1] in goal_bounds:
                hit_goal = True
                if run != 0:
                    runtimes.append(total_time - sum(runtimes))
                    run_active = False
                    print "Goal found; run {} completed!".format(run)
                    
    # Report score if robot is successful.
    if len(runtimes) == 2:
        print "Task complete! Score: {:4.3f}".format(runtimes[1] + train_score_mult*runtimes[0])

Starting run 0.
[0, 0] [ 0,11, 0]   0 => u,  1 steps = [0, 1]
[0, 1] [ 0,10, 0]   0 => u,  1 steps = [0, 2]
[0, 2] [ 0, 9, 3]   0 => u,  1 steps = [0, 3]
[0, 3] [ 0, 8, 0]   0 => u,  1 steps = [0, 4]
[0, 4] [ 0, 7, 0]   0 => u,  1 steps = [0, 5]
[0, 5] [ 0, 6, 0]   0 => u,  1 steps = [0, 6]
[0, 6] [ 0, 5, 2]  90 => r,  1 steps = [1, 6]
[1, 6] [ 1, 1, 3]   0 => r,  1 steps = [2, 6]
[2, 6] [ 1, 0, 2]  90 => d,  1 steps = [2, 5]
[2, 5] [ 1, 1, 0] -90 => r,  1 steps = [3, 5]
[3, 5] [ 2, 0, 1] -90 => u,  1 steps = [3, 6]
[3, 6] [ 0, 1, 0]   0 => u,  1 steps = [3, 7]
[3, 7] [ 1, 0, 0] -90 => l,  1 steps = [2, 7]
[2, 7] [ 3, 0, 0] -90 => d,  1 steps = [2, 6]
[2, 6] [ 0, 2, 2]   0 => d,  1 steps = [2, 5]
[2, 5] [ 1, 1, 0] -90 => r,  1 steps = [3, 5]
[3, 5] [ 2, 0, 1]  90 => d,  1 steps = [3, 4]
[3, 4] [ 3, 0, 0] -90 => r,  1 steps = [4, 4]
[4, 4] [ 5, 2, 1] -90 => u,  1 steps = [4, 5]
[4, 5] [ 0, 4, 0]   0 => u,  1 steps = [4, 6]
[4, 6] [ 0, 3, 0]   0 => u,  1 steps = [4, 7]
[4, 7] [ 0, 2, 2] 

KeyboardInterrupt: 