The following code template comes with placeholders (marked with `YOUR CODE HERE`) and has to be filled with the appropriate logic as described in the function descriptions.
Some cells consist of assertions about the expected functionality, and aid you in getting it right.
They are not meant to be edited.
Moreover, there are also hidden assertions that will test your environment for edge cases.
Each correct assertion rewards you points for this assignment.

Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [17]:
NAME = "Shivaram Goutham Suresh"
COLLABORATORS = "Pramod Mahajan Chikkaballekere Manjunatha"

_Please note that only one collaborator is allowed in all assignments, that is, only two persons per group. We expect independent submissions per group. All deviations will be sanctioned._

---

# Create an RL environment
Use case: Cyclic race track

Inspired by Barto & Sutton: Reinforcement Learning: An Introduction (2nd edition). Exercise 5. 12.

![](RaceTrack.png)

In the upcoming tutorial (topic: Monte Carlo RL), we will need an RL environment to interact with.
The specific environment that we will examine is that of an rectangular race track.

* The race track is a grid-world with one agent, where each field is either of the following: street, wall, start line, finish line.
* In each episode, the agent is supposed to start at a random position within the start line and has to reach the finish line as fast as possible.
* Upon hitting a wall, the agent is teleported back to the starting line (again at a random point) without terminating the episode.
* Each action taken by the agent results in a reward of -1.
* The agent can choose from nine actions: Accelerating forward, staying idle, or accelerating backward in either or both directions.
* Acceleration increments/decrements the agents velocity up to a maximum velocity in the corresponding direction.
* Before updating the agents’s location at each time step, check to see if the projected path of the car intersects the track boundary.
  * Intersecting a wall leads to back teleportation.
  * Intersecting with the finish line leads to episode termination.

In this homework assignment, you are supposed to write a python class representing such a race track that can be configured to arbitrary dimensions.
The interface of this class is geared to the specifications of a typical [OpenAI gym environment](https://gym.openai.com/).
However, the gym package is not required for this assignment.

## 1. Race Track Environment (25 points)

In this part of the assignment, you are supposed to write the logic behind a race track environment.

In [18]:
import numpy as np
import random


class RaceTrackEnv:
    """
    RaceTrackEnv object maintains and updates the race track 
    state. Interaction with the class is through
    the step() method (see OpenAI Gym interface)
    
    The class constructor is given a race course as a list of 
    strings. The constructor loads the course and initializes 
    the environment state.
    """
    
    lbl2int = {'o': 1, '-': 0, '+': 2, 'W': -1}

    
    def __init__(self, course):
        """
        Load race course, set any min or max limits in the 
        environment (e.g. max speed), and set initial state.
        Initial state is random position on start line with 
        velocity = (0, 0).
        
        Example:
            tiny_right_turn_course = 
                  ['WWWWWW',
                   'Woooo+',
                   'Woooo+',
                   'WooWWW',
                   'WooWWW',
                   'WooWWW',
                   'WooWWW',
                   'W--WWW',]
        
        Args:
            course: List of text strings used to construct
                race-track.
                    '-': start line
                    '+': finish line
                    'o': track
                    'W': wall
        
        Returns:
            self
        """
        self.MAX_VELOCITY = 5
        self.start_positions = []
        self.finish_positions = []
        self.course = np.empty((len(course), len(course[0])), dtype=np.int8)
        self.position = np.empty(2, dtype=np.int8)
        self._load_course(course)
        self._random_start_position()
        self.velocity = np.array([0, 0], dtype=np.int8)
        self.bounds = (len(course), len(course[0]))
        
    
    # Find the intersections of the line with the cells, when the slope is less than 1
    def _find_intersections_low(self, start_x, start_y, end_x, end_y):
        dx = end_x - start_x
        dy = end_y - start_y
        y_increment = 1
        
        if dy < 0:
            y_increment = -1
            dy = -dy
        
        diff = 2*dy - dx
        y = start_y
        
        intersections = []
        for x in range(start_x, end_x+1):
            intersections.append([y,x])
            if diff > 0:
                y += y_increment
                diff = diff - 2*dx
            diff += 2*dy
        
        return intersections
    
    
    # Find the intersections of the line with the cells, when the slope is more than 1
    def _find_intersections_high(self, start_x, start_y, end_x, end_y):
        dx = end_x - start_x
        dy = end_y - start_y
        x_increment = 1
        
        if dx < 0:
            x_increment = -1
            dx = -dx
            
        diff= 2*dx - dy
        x = start_x
        
        intersections = []
        for y in range(start_y, end_y+1):
            intersections.append([y,x])
            if diff > 0:
                x +=  x_increment
                diff = diff - 2*dy
            diff += 2*dx
            
        return intersections
        
        
    def step(self, action):
        """
        Perform given action on the environment.
        The reward for every action and state is -1, except for when 
        reaching the finish line.
        
        Args:
            action: integer-encoded {0-8} or a 
                2-tuple (a_y, a_x) in {[-1, 0, 1], [-1, 0, 1]}.
        
        Returns:
            4-tuple: (new_state, reward, done, info)
                new_state: the new state resulting from taken action
                reward: The reward obtained from taken action
                done: Bool. Whether we are in a terminal state.
                Info: Dict. Arbitrary information.
                
        """
        # Get the result of action applied to current state
        s_y, s_x, s_vy, s_vx, a_y, a_x = self.state_action(self.get_state(), action)
        s_vy = min(s_vy + a_y, self.MAX_VELOCITY)
        s_vx = min(s_vx + a_x, self.MAX_VELOCITY)
        
        # Calculated next state positions, bounded by length and breadth of the course
        ns_y = min(s_y + s_vy, self.bounds[0] - 1)
        ns_x = min(s_x + s_vx, self.bounds[1] - 1)

#         print('current pos: ', s_y, s_x)
#         print('current vel: ', s_vy, s_vx)
#         print('action: ', a_y, a_x)
#         print('next pos: ', ns_y, ns_x)
#         print('next vel: ', s_vy, s_vx)
        
        a = (ns_y - s_y)
        b = (s_x - ns_x)
        c = a*s_x + b*s_y
        
        intersections = [] # List to store the intersecting cell coordinates 
        if (ns_x - s_x) == 0: # Slope infinity
            x = s_x
            if s_y > ns_y:
                for y in range(ns_y, s_y+1):
                    intersections.append([y,x])
            else:
                for y in range(s_y, ns_y+1):
                    intersections.append([y,x])
        else:
            if abs(ns_y - s_y) < abs(ns_x - s_x): # Slope < 1
                if s_x > ns_x:
                    intersections = self._find_intersections_low(ns_x, ns_y, s_x, s_y)
                else:
                    intersections = self._find_intersections_low(s_x, s_y, ns_x, ns_y)
            elif abs(ns_y - s_y) > abs(ns_x - s_x): # Slope > 1
                if s_y > ns_y:
                    intersections = self._find_intersections_high(ns_x, ns_y, s_x, s_y)
                else:
                    intersections = self._find_intersections_high(s_x, s_y, ns_x, ns_y)
            else: # abs(Slope) = 1
                if (ns_x > s_x and ns_y > s_y): 
                    j = s_y
                    for i in range(s_x, ns_x+1):
                        intersections.append([j,i])
                        j += 1
                elif (s_x > ns_x and s_y > ns_y):
                    j = ns_y
                    for i in range(ns_x, s_x+1):
                        intersections.append([j,i])
                        j += 1
                else:
                    if s_y > ns_y and ns_x > s_x:
                        j = s_y
                        for i in range(s_x, ns_x+1):
                            intersections.append([j,i])
                            j -= 1
                    elif ns_y > s_y and s_x > ns_x:
                        i = s_x
                        for j in range(s_y, ns_y+1):
                            intersections.append([j,i])
                            i -= 1

        done = False
        # Check for obstacle and finish line crossing
        for row, col in intersections:
            if self.course[row, col] == self.lbl2int['W']:
                self.reset()
                break
            else:
                if self._is_finish([row,col]): 
                    done = True
                    self.position = [ns_y, ns_x]
                    self.velocity = [s_vy, s_vx]
                    break
                else:
                    self.position = [ns_y, ns_x]
                    self.velocity = [s_vy, s_vx]
        
        reward = -1
        next_state = (self.position, self.velocity)
        
        return next_state, reward, done, _
        
        
    def get_state(self):
        """Return 2-element-tuple: (position, velocity). Each is a 2D numpy array."""
        return tuple([self.position.copy(), self.velocity.copy()])
            
            
    def reset(self):
        """Set velocity to 0 in both directions and set the position to any
        of the possible start positions.
        Returns the resulting state."""
        self._random_start_position()
        self.velocity[0] = 0
        self.velocity[1] = 0
        return self.get_state()

    
    def _random_start_position(self):
        """Set agent to random position on start line."""
        self.position = np.asarray(self.start_positions[random.randint(0, len(self.start_positions) - 1)])
    
    
    def _load_course(self, course):
        """Load given course. The course is expected to be a list of strings.
        Each string represents a horizontal line of the track.
        See __init__ doc.
        The course is to be internally represented as numpy array."""
        y_idx = 0
        for row in course:
            x_idx = 0
            for cell in row:
                cell_value = self.lbl2int[cell]
                self.course[y_idx, x_idx] = cell_value
                
                if cell_value == 0:
                    self.start_positions.append((y_idx, x_idx))
                elif cell_value == 2:
                    self.finish_positions.append((y_idx, x_idx))
                
                x_idx += 1
            y_idx += 1
    
    
    def _is_finish(self, pos):
        """Return True if given position is in finish line"""
        for finish_position in self.finish_positions:
            if pos[0] == finish_position[0] and pos[1] == finish_position[1]:
                return True
        return False
    
    
    def is_terminal_state(self):
        """Return True at episode terminal state"""
        return self._is_finish(self.position)
    
    
    def action_to_tuple(self, a):
        """Convert integer action to 2-tuple: (ay, ax)"""
        q,r = divmod(a,3)
        return tuple((q-1, r-1))
    
    
    def tuple_to_action(self, a):
        """Convert 2-tuple to integer action: {0-8}.
        Since there are two axes that can go forward, backward or 
        idle, we have 3² actions"""
        return ((a[0]+1)*3 + (a[1]+1))
    
    
    def state_action(self, s, a):
        """Build a state-action tuple for indexing Q NumPy array."""
        if not isinstance(a, tuple):
            a = self.action_to_tuple(a)
        p, v = s
        s_y, s_x = p[0], p[1]
        s_vy, s_vx = v[0], v[1]
        a_y, a_x = a[0], a[1]
        return s_y, s_x, s_vy, s_vx, a_y, a_x
    

#     def render():
#         """Render the current position of the agent within the track text-based"""
#         # YOUR CODE HERE
#         raise NotImplementedError()

The following read-only cells are full of (potentially hidden) assertions, which test your implementation.
Having all assertions correct leads to a full score for this assignment.

In [19]:
test_course = ['WWWWWW',
               'Woooo+',
               'Woooo+',
               'WooWWW',
               'WooWWW',
               'WooWWW',
               'WooWWW',
               'W--WWW',]

track = RaceTrackEnv(test_course)

assert len(track.start_positions) > 0
assert track.course is not None
assert track.course.shape[0] == len(test_course)
assert track.course.shape[1] == len(test_course[0])

In [20]:
def check_policy_against_expected_reward(pi, expected_rewards=[]):
    n_episodes = 5
    max_steps = 100
    for trial in range(n_episodes):
        s = track.reset()
        a = pi[tuple(s[0])]
        done = False
        reward = 0
        for _ in range(max_steps):
            s_next, r, done, _ = track.step(a)
            reward += r
            a = pi[tuple(s_next[0])]
            if done: break
        assert reward in expected_rewards, f'reward {reward} is not in expected_rewards {expected_rewards}'

test_policy_1 = {(i, j): (0, 0) for i in range(len(test_course)) for j in range(len(test_course[0]))}
test_policy_1.update({(7, 1): (-1, 0),
                      (7, 2): (-1, 0),
                      (1, 1): (1, 1),
                      (2, 2): (1, 1)})
check_policy_against_expected_reward(test_policy_1, expected_rewards=[-8, -10])

In [21]:
for a in range(9):
    assert track.tuple_to_action(track.action_to_tuple(a)) == a
tup = track.action_to_tuple(0)
assert isinstance(tup, tuple)
assert len(tup) == 2

In [22]:
# this cell contains hidden tests

In [23]:
# this cell contains hidden tests

In [24]:
# this cell contains hidden tests

In [25]:
# this cell contains hidden tests

In [26]:
# this cell contains hidden tests

In [27]:
# this cell contains hidden tests

In [28]:
# this cell contains hidden tests

In [29]:
# this cell contains hidden tests

## 2. Rectangular Course (5 points)

In the following, build a rectangular race track by implementing the logic for the `build_rect_course` function.

In [30]:
def build_rect_course(course_dim, inner_wall_dim):
    """Build a race track given specifications for the outer cyclic street and inner wall dimensions.
    Start and finish line should be placed in the center top. The course dimension specifications
    do not consider a bounding wall around the track, which must be inserted additionally.
    
    Args:
        course_dim: 2-tuple, (y-dim, x-dim): The size of the track without outer walls.
        inner_wall_dim: 2-tuple (y-dim, x-dim): The size of the inner wall
    
    """
    """
    Assumptions: 
    Since number of columns are even number, the start line is chosen to be in 22/2=11 + 1 = 12th column
    The finish line is symmetric to startlinee but a column behind, i.e at 11th column
    The outerwall thickness is one grid
    The inner wall thickness is one grid too, but assumed to be covered within the given data, i.e 4X10 includes thickness of inner wall 
    Note: The whole grid is created as (14+2)X(20+2) list of '0'(path to travel) and then walls are added at mentioned places.
    """
    # YOUR CODE HERE
    RaceTrackGrid = [['o' for x in range(course_dim[1]+2)] for y in range(course_dim[0]+2)] # Adding walls on 4 sides(outer boundary)
        
    #setting outerwall
    outer_wall_start_x = 0
    outer_wall_start_y = 0
    for i in [outer_wall_start_y, outer_wall_start_y+len(RaceTrackGrid)-1]:
        for j in range(outer_wall_start_x, outer_wall_start_x+len(RaceTrackGrid[0])):
            RaceTrackGrid[i][j] = 'W'
    for i in range(outer_wall_start_y, outer_wall_start_y+len(RaceTrackGrid)):
        for j in [outer_wall_start_x, outer_wall_start_x+len(RaceTrackGrid[0])-1]:
            RaceTrackGrid[i][j] ='W'
    
    # setting inner wall
    inner_wall_start_x = outer_wall_start_x + (len(RaceTrackGrid[0]) - inner_wall_dim[1])//2
    inner_wall_start_y = outer_wall_start_y + (len(RaceTrackGrid) - inner_wall_dim[0])//2
    for i in range(inner_wall_start_y, (inner_wall_start_y+inner_wall_dim[0])):
        for j in range(inner_wall_start_x, (inner_wall_start_x + inner_wall_dim[1])):
            RaceTrackGrid[i][j]='W'
    
    # start line
    for i in range(1,inner_wall_start_y):
        RaceTrackGrid[i][len(RaceTrackGrid[0])//2] = '-'
    
    # Finish line
    for i in range(1,inner_wall_start_y):
        RaceTrackGrid[i][(len(RaceTrackGrid[0])//2)-1] = '+'
        
    return RaceTrackGrid

In [31]:
# course dimensions
_course_dim = (14, 20)
_inner_wall_dim = (4, 10)
course = build_rect_course(_course_dim, _inner_wall_dim)
assert isinstance(course, list)
assert len(course) == _course_dim[0] + 2  # boundary walls on top and bottom
assert len(course[0]) == _course_dim[1] + 2  # boundary walls on left and right
assert any('+' in line for line in course[:len(course)//2])
assert any('-' in line for line in course[:len(course)//2])
assert all('W' in line for line in course)
assert all('W' == letter for line in [course[0], course[-1]] for letter in line)
assert all(('W' == line[0]) and ('W' == line[-1]) for line in course)

In [32]:
# this cell contains hidden tests