In [None]:
import gymnasium as gym
import numpy as np
import os, sys
import time
import sys
import traffic_control_game
import pygame
from collections import defaultdict
import datetime
import matplotlib.pyplot as plt
from tqdm import tqdm
import seaborn as sns
import ast
import json
from matplotlib import cm
from matplotlib.ticker import LinearLocator
import matplotlib
import itertools
import plotly.express as px
import tiles3 as tc

#is_ipython = 'inline' in matplotlib.get_backend()
#if is_ipython:
#    from IPython import display

#plt.ion()



In [None]:
def timer(start_time=None, string=None):
    '''
    Function to compute the time
    start_time : starting time generated calling this function without arguments the first time
    string: visualization purposes (task description)
    '''
    if not start_time:
        start_time=datetime.datetime.now()
        return start_time
    elif start_time:
        thour, temp_sec = divmod((datetime.datetime.now()-start_time).total_seconds(),3600)
        tmin, tsec = divmod(temp_sec,60)
        pr = "for " + str(string) + " " if string else ""
        print("Execution time", pr, "is ", thour," h :", tmin,' m :', round(tsec,2), " s")

In [None]:
"""
Function to load and store dictionaries safely by saving converting them
to json format. Used due to the long training times and the instability
of Collab
"""

def save_json(q, filename):
    with open(f'{filename}.json', 'w') as fp:
        to_json = str({k: list(v) for k, v in q.items()})
        json.dump(to_json, fp)
    

def load_json(filename):
    with open(f'{filename}.json') as json_file:
        data = ast.literal_eval(json.load(json_file))
    return data
    

In [2]:
# Environment initialization

env = gym.make("traffic_control-v0", n_states = 2, render_mode="human") 

print(f"Observations space: {env.observation_space}")
print(f"Action space: {env.action_space}")


Observations space: Dict('NS': Discrete(76), 'WE': Discrete(92))
Action space: Discrete(6)


In [8]:
env.observation_space.sample()

OrderedDict([('NS', 32), ('WE', 57)])

In [18]:
a = env.observation_space.sample()
#obs = np.array(list(a.values()))
a

OrderedDict([('NS', 10), ('WE', 1)])

In [21]:


print(x_min, x_max, y_min, y_max)

0.0 75.0 0.0 91.0


# Semi-gradient Sarsa agent

In [None]:

class TileCoder:
    ''' Class to facilitate tile coding representations of states passed as parameters '''
    def __init__(self, iht_size=4096, num_tilings=8, num_tiles=8):
        self.iht = tc.IHT(iht_size)
        self.num_tilings = num_tilings
        self.num_tiles = num_tiles
    
    def get_tiles(self, queue_ns, queue_we):
        # Range of minimum and maximum value for each of the 2 components of the observation vector
        x_min, x_max = float(env.observation_space["NS"].start), float(env.observation_space["NS"].start + env.observation_space["NS"].n-1)
        y_min, y_max = float(env.observation_space["WE"].start), float(env.observation_space["WE"].start + env.observation_space["WE"].n-1)
        
        queue_ns_scaled = (queue_ns-x_min)*(self.num_tiles / (x_max-x_min))
        queue_we_scaled = (queue_we-y_min)*(self.num_tiles / (y_max-y_min))

        tiles = tc.tiles(self.iht, self.num_tilings, [queue_ns_scaled, queue_we_scaled])
        
        return np.array(tiles)
        

In [3]:
a = list(np.arange(10000))

fig, ax = plt.subplots(1, 1, figsize=(10, 10))
ax.plot(a, a)
plt.show()

: 

: 

In [None]:

class SarsaAgent():
    """
    Class for the Semi-Gradient Sarsa agent.
    """
    def __init__(self):
        """ All values are set to None so they can be initialized in the agent_init method """
        self.num_tilings = None
        self.num_tiles = None
        self.iht_size = None
        self.initial_weights =None
        self.epsilon = None
        self.epsilon_decay = None
        self.epsilon_min = None
        self.discount = None
        self.num_actions = None
        self.step_size = None
        self.w =None
        self.tc = None
        self.previous_tiles, self.previous_action = None, None

    def info_init(self, info={}):
        """Setup for the agent, passed in a dictionary, called when the experiment first starts """
        self.num_tilings = info.get("num_tilings", 8)
        self.num_tiles = info.get("num_tiles", 8)
        self.iht_size = info.get("iht_size", 4096)
        self.initial_weights = info.get("initial_weights", 0.0)

        # EPSILON????????
        self.epsilon = info.get("eps_start", 1.0)
        self.epsilon_decay = info.get("eps_decay", .99999)
        self.epsilon_min = info.get("eps_min", 0.05)

        self.discount = info.get("discount", 1.0)
        self.num_actions = info.get("num_actions", 6)
        self.step_size = info.get("step_size", 0.5) / self.num_tilings

        self.w = np.ones((self.num_actions, self.iht_size)) * self.initial_weights

        self.tc = TileCoder(iht_size=self.iht_size, 
                            num_tilings=self.num_tilings, 
                            num_tiles=self.num_tiles)
        
    def decay(self):
      """ Classical geometric decay of epsilon as beseline """
      self.epsilon = max(self.epsilon*self.epsilon_decay, self.epsilon_min)

    def get_value(self, state):
      """ Function used to plot the estimates of state-action value """
      active_tiles = self.tc.get_tiles(x_distance=state[0], y_distance=state[1]) 
      action, value = self.choose_action(active_tiles, greedy=True)
      return action, value

    def argmax(self, values):
      if values[0]==values[1]:
        return np.random.choice(self.num_actions)
      else:
        return np.argmax(values)

    def play(self, env, fin_score = 30000, print_=False, print_info=False):
      ''' outside of training '''
      state, info = env.reset()
      if print_info:
        print("The game has started...")
      while True:
          active_tiles = self.tc.get_tiles(x_distance=state[0], y_distance=state[1]) 
          action, _ = self.choose_action(active_tiles, greedy=True)
          next_state, reward, done, _, info = env.step(action)

          if print_:
            # Render the game
            os.system("clear")
            sys.stdout.write(env.render())
            time.sleep(0.2) # FPS
          
          if (done) or (info["score"]>=fin_score): # If player is dead break
            to_vis = info["score"]
            if print_info:
              print(f"\nThe game is done! Final score: {to_vis: ,}\n")
            break
          else:
            state = next_state  
      env.close()
      return to_vis

    def choose_action(self, tiles, greedy=False):
      ''' Function to choose action according to epsilon-greedy strategy,
          based on current tile-based state representation  '''
      action_values = [np.sum(self.w[action][tiles]) for action in range(self.num_actions)] 
      if (np.random.random()<self.epsilon) and (not greedy):
        chosen = np.random.choice(self.num_actions) 
      else:
        chosen = self.argmax(action_values) 
      return chosen, action_values[chosen]

    def start(self, state):
      """ Take first move and store first action and tile-based representation of state """
      active_tiles = self.tc.get_tiles(x_distance=state[0], y_distance=state[1]) 
      action, _ = self.choose_action(active_tiles, greedy=False)

      self.previous_tiles = np.copy(active_tiles)
      self.previous_action = action
      return action

    def update(self, reward, state):
      """ Update of Sarsa algorithm (on-policy method)
          The q-values of the previous state-action pair are updated
          based on the value of the action taken in successive state (passed as parameter) """

      if state:
        active_tiles = self.tc.get_tiles(x_distance=state[0], y_distance=state[1]) 
        action, action_value = self.choose_action(active_tiles, greedy=False)
      
      action_value = action_value if state!=False else 0
      update_target = reward + self.discount*action_value - np.sum(self.w[self.previous_action][self.previous_tiles])
      self.w[self.previous_action][self.previous_tiles] += self.step_size * update_target * np.ones((self.num_tilings,))

      if not state:
        return
      else:
        self.previous_tiles = np.copy(active_tiles)
        self.previous_action = action
        return action


In [None]:
obs, info = env.reset()


#actions_loop = [0]*5 + [1]*5 + [0] + [1]*5 + [0] + [1]*5
#actions_loop = [2]*10+[0]
actions_loop = [1]*10