<a href="https://colab.research.google.com/github/sansomguy/OnTrack_Public/blob/main/SIT796_Task1_C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Task 1.C
## SIT796 - Joshua Sansom-Sherwill - 220013964
---
## 1. Environment
For our environment we are going to be creating a Traffic Light T-Intersection.
The traffic light environment has been defined in task 1.P, however I will give a brief summary of what it's doing.

* A negative reward is given for "traffic" waiting at the intersection.
* A positive reward is given for traffic flowing through the intersection.
* The longer the traffic is waiting at an intersection the less the reward. 
* The intersection is made up of 3 "phases". Each of these phases represents directions in which traffic can travel (think of a T-intersection).
* A traffic control policy can choose from only 4 actions: 1 for each phase, and 1 for doing nothing.

The code below is an attempt to define this environment for the gym framework.


---




In [5]:
import gym
from gym import spaces
import numpy as np
import random as rand

class TrafficControlEnv(gym.Env):
  # create properties required for representing 
  # and update state of our traffic intersection
  def __init__(self, u, r):
    super(TrafficControlEnv, self).__init__()
    # action & observation spaces
    self.action_space = spaces.Discrete(4)
    self.observation_space = spaces.Discrete(9)
    self.u = u # phase transition time
    self.r = r # waiting transition time
    self.active_phase = np.array([0,0,0])
    self.transition_time = 0
    self.waiting_time = np.array([0,0,0])
    self.flow = np.array([0,0,0])
    rand.seed(None)

  # clean state
  def reset(self):
    # All traffic is stopped at start
    self.phase = np.array([0,0,0]) 
    # reset to some initial traffic
    self.traffic = np.array([0,1,0]) 
    # no traffic flow yet
    self.flow = np.array([0,0,0]) 
    # total time transitioning from one phase to another
    self.transition_time = 0 
    # each phase's respective time where traffic has spent waiting
    self.waiting_time = np.array([0,0,0]) 

    return self._observe_traffic()

  # step through time using new action
  def step(self, action):
    previous_phase = get_index(self.phase, 1)
    self._perform(action)

    # calculate waiting
    # need to consider whether a phase transition is taking place
    if self.transition_time < self.u:
      self.transition_time += 1
    else:
      self.active_phase = self.phase
  
    # update our traffic waiting time 
    # based on phase, action, and transition times
    self._update_waiting_time()
    
    # add random traffic
    self._add_traffic()

    # randomly define flow of traffic
    self._update_flow()

    reward = self._get_reward()
    
    # get our obs array
    obs = self._observe_traffic()

    # then we know we have more than one phase transition occurring
    # which is not allowed in our traffic lights because it can cause an accident
    done = previous_phase != get_index(self.phase, 1) and previous_phase != get_index(self.active_phase, 1)

    return obs, reward, done, {}
  
  # calculate the reward of current state
  def _get_reward(self):
    # get the category of highest waiting time
    # None, Short, Medium, Long
    max_wait = min(max(self.waiting_time), 3)
    wait_category = int((max_wait - (max_wait % self.r))/self.r)
    flow_category = get_index(self.flow,1) + 1
    # create our reward lookup table
    reward_table = [[0,-2,-3, -4],
                    [1,-1,-2,-3],
                    [2,0,-1,-2],
                    [3,1,0,-1]]
    return reward_table[flow_category][wait_category];
    

  # randomly define and add some traffic
  # under a certain probability constraint
  # to simulate gradual updates of traffic
  def _add_traffic(self):
    # randomly add some traffic
    add_traffic = rand.randint(0,10) >= 9 # add traffic 10% of the time
    if add_traffic:
      new_traffic = rand.randint(0,2) 
      self.traffic[new_traffic] = 1

  # randomly update the traffic flow of the intersection
  def _update_flow(self):
    self.flow = np.array([0,0,0])
    self.flow[rand.randint(0,2)] = 1

  # return discrete observation from traffic intersection model
  def _observe_traffic(self):
    return np.append(np.append(self.phase, self.traffic), self.flow)

  def _perform(self, action):
    assert self.action_space.contains(action)
    previous_phase = self.phase

    if action == 0: # noop
      self.phase = self.phase
    elif action == 1: # phase 1 # Allow middle/ceter lane to flow
      self.phase = np.array([1,0,0])
    elif action == 2: # phase 2 # Allow left lane to flow
      self.phase = np.array([0,1,0])
    elif action == 3: # phase 3 # Allow right lane to flow
      self.phase = np.array([0,0,1])

    # account for transition time
    if get_index(previous_phase, 1) != get_index(self.phase, 1):
      self.transition_time = 0

  # calculate a discrete value of waiting time based on current traffic
  def _update_waiting_time(self):
    # Calculate the waiting times considering active phase
    self.traffic = self.traffic - self.active_phase

    # increment items that are still waiting
    self.waiting_time += self.traffic 
    # remove items that are no longer waiting
    no_traffic_indexes = [i for i in range(len(self.traffic)) if self.traffic[i] == 0 or self.traffic[i] < 0]
    for i in no_traffic_indexes:
      self.waiting_time[i] = 0 
      self.traffic[i] = 0



# return index of item in 1D array
# return -1 if not find
def get_index(arr, val):
  indexes = np.where(arr == val)
  if len(indexes[0]) > 0:
    return indexes[0][0]
  else:
    return -1

# 2. Policy
Here I create a policy to managae the traffic lights environment we have created above.

This policy is not going to be creating a statistical model to help it choose actions based on previous rewards, but rather just a rule based model.

Here are the rules this policy is going to implement in order to avoid accidents and maximize traffic flow.

1. Given traffic, begin phase transition with respect to that traffic.
2. Each phase has a minimum time to run of 4 time steps.
3. We will always initiate the phase for which we spotted traffic the earliest.

---

In [6]:
# This class is going to manage our policy actions
# some state like last known traffic is important to remember between time steps
class PolicyModel:
  def __init__(self):
    self.min_phase = 4 # minimum time steps for one phase of traffic
    self.first_traffic = -1 # keep track of the current traffic information

  def policy(self, obs, t):
    #test whether we should make a change at this point in time game
    if self.first_traffic == -1:
      traffic = obs[3:6] # find the traffic information from our discrete observation
      self.first_traffic = get_index(traffic, 1) # find the index of any active traffic

    # if we are still t < min_phase time steps into our phase then we know we can't do anything yet
    # or if we have no traffic, we know we also don't need to do anything yet
    if t % self.min_phase != 0 or self.first_traffic == -1:
      return 0 # do nothing action

    action = self.first_traffic + 1
    # reset traffic to the next
    self.first_traffic = -1

    return action




# 3. Execute & Test
Here we setup our traffic environment using the standard Gym environment boilerplate code. However, we are going to be utilising our very own policy model to choose which actions to take next.

Importantly, there is no winning state for this game, the traffic control policies just have to try and earn as much reward as possible over the time limit defined.

---


In [7]:
TIME_LIMIT = 100

# Create new traffic control gym environment
env = TrafficControlEnv(3, 2)
# create policy instance
model = PolicyModel()

# Reset to clear state from potentialy earlier runs
o = env.reset() 
# for each time step
for t in range(TIME_LIMIT):
    # choose action from policy
    action = model.policy(o,t)
    # take step given action
    o, r, d, _ = env.step(action) 
    #reward is not utilized by our policy because 
    #we are only following some basic business rules.

    # print reward for each given action so we can see
    # what our policy is doing to the environment
    print("Reward {0}, Action {1}".format(r, action))

    if d and t<TIME_LIMIT-1:
        print("Task failed in ", t, " time steps")
        break
else:
    print("Time limit reached.")

env.reset()

Reward 3, Action 2
Reward 0, Action 0
Reward -1, Action 0
Reward 2, Action 0
Reward 1, Action 2
Reward 1, Action 0
Reward 1, Action 0
Reward 3, Action 0
Reward 2, Action 0
Reward 2, Action 0
Reward 3, Action 0
Reward -1, Action 0
Reward 1, Action 1
Reward 1, Action 0
Reward -1, Action 0
Reward 3, Action 0
Reward 3, Action 1
Reward -1, Action 0
Reward -1, Action 0
Reward 0, Action 0
Reward 0, Action 3
Reward -1, Action 0
Reward -1, Action 0
Reward 3, Action 0
Reward -1, Action 3
Reward 1, Action 0
Reward 1, Action 0
Reward -1, Action 0
Reward -1, Action 2
Reward 1, Action 0
Reward -1, Action 0
Reward 2, Action 0
Reward 1, Action 2
Reward 2, Action 0
Reward 2, Action 0
Reward 3, Action 0
Reward 2, Action 0
Reward 1, Action 0
Reward 3, Action 0
Reward 1, Action 0
Reward 2, Action 2
Reward 3, Action 0
Reward 2, Action 0
Reward 3, Action 0
Reward 1, Action 2
Reward 2, Action 0
Reward 1, Action 0
Reward 3, Action 0
Reward 3, Action 0
Reward 3, Action 0
Reward 3, Action 0
Reward 1, Action 0
R

array([0, 0, 0, 0, 1, 0, 0, 0, 0])

# 4. Summary
From above we can see that our policy does a decent job of choosing actions that get it the greatest rewards. However this was done using some rules based on knowledge of the environment and it's constraints. In the future I would like to implement a RL based approach to optimizing these decisions, taking into account the rewards and building it's own internal value function rather than having to devise a policy function based on the mechanics of the game.

# 5. Reference List
1. Adam King, "Create custom gym environments from scratch — A stock market example", Towards Datascience,  https://towardsdatascience.com/creating-a-custom-openai-gym-environment-for-stock-trading-be532be3910e (Accessed March 13, 2021)