In [None]:
!pip install jdc
import jdc

In [6]:
from collections import namedtuple

path = namedtuple('path', 'from_ to_')


class Environment:
    def __init__(self, states=[], paths={}):
        self.states = states # All the possible states in the environment.
        self.paths = paths # A dictionary that maps a path object to its associated reward.

    def add_state(self, name='S'): # Allows the addition of a new state to the environment.
        self.states[name] = name

    def add_path(self, path, reward=1): # Allows the addition of a new path to the environment.
        """
        Parameters:
        path: a path
        reward: reward for taking the given path
        """
        if path not in self.paths.keys():  # no duplicates
            self.paths[path] = reward
        if path.from_ not in self.states:
            self.states.append(path.from_)
        if path.to_ not in self.states:
            self.states.append(path.to_)

    def reward(self, action): # Returns the reward associated with a particular path, if it exists.
        if action in self.paths:
            return self.paths[action]
        else:
            return 0 # Returns 0 if path does not exist.

    def __str__(self): #  Returns a string representation of the environment.
        e = 'States:\n\t'
        e += ' '.join([str(s) for s in self.states])
        e += '\nPaths:'
        e += ' '.join(['\n ' + str(p) for p in self.paths])
        return 

In [7]:
import random
from collections import namedtuple

path = namedtuple('path', 'from_ to_')


class Agent:
    def __init__(self,
                 env,
                 goal_state,
                 explore=0.9,
                 discount_rate=0.9,
                 learning_rate=0.1,
                 iterations=1000,
                 seed=2):
        self.gamma = discount_rate
        self.alpha = learning_rate
        self.env = env
        self.Q = {} # dictionary to hold Q-values
        self.len_path_to_goal = 0
        self.goal_state = goal_state
        self.explore_or_exploit = explore # exploration probability
        self.iterations = iterations
        random.seed(seed)

    def update_Q(self, action, reward):
        future_paths = [p for p in self.Q.keys if p._from == action[1]]
        # max_value_future_path =
        self.Q[action] = reward + self.gamma * self.Q[action] - self.Q[action]

    def learn(self):
        for i in range(self.iterations):
            from_ = random.choice(range(len(self.env.states))) # randomly choose starting state
            # Explore or exploit
            x = random.random()
            print(x)
            if x > self.explore_or_exploit: # Explore
                to_ = random.choice(range(len(self.env.states))) # randomly choose destination state
                print('Exploring path ', path(from_, to_))
            else: # exploit
                max_quality = 0
                to_ = from_
                for fs, ts in self.Q.keys(): # iterate over all possible actions from the current state
                    if fs == from_ and (max_quality < self.Q[(fs,ts)]): # find the action with the highest Q-value
                        max_quality = self.Q[path(fs,ts)]
                        to_ = ts
                print('Exploiting path ', path(from_, to_))
            action = path(from_, to_) # create an action based on the chosen states
            # get reward
            R = self.env.reward(action)
            # update quality of action
            #
            if action not in self.Q.keys():
                self.Q[action] = 0 
            # quality of a state is equal to the action
            # that can be taken in this state
            current_state_quality = self.Q[action] # current Q-value of the action
            # future_paths is the list of possible future actions from the next state
            future_paths = []  # [p for p  in self.Q.keys() if p.from_ == action.to_]
            for p in self.Q.keys():
                if p.from_ == action.to_:
                    future_paths.append(p)
            if len(future_paths) == 0:
                future_state_quality = 0
            else:
                future_state_quality = max(
                    [self.Q[p] for p in future_paths])  # max achievable quality of possible actions in future state
            # max_value_future_path =
            td = R + (self.gamma * future_state_quality) - current_state_quality
            self.Q[action] = self.Q[action] + (self.alpha * td) # update Q-value of the action
        print(self.Q)

    def go(self, from_): 
        max_q_action = 0
        max_q_state = None
        for to_ in self.env.states: # Loop through all possible states to find the state with the maximum Q-value.
            if self.Q[path(from_, to_)] > max_q_action: # Check if the Q-value for the current action is greater than the current maximum Q-value.
                max_q_action = self.Q[path(from_, to_)] # Update the maximum Q-value to the Q-value of the current action.
                max_q_state = to_
        print(from_, max_q_state)
        if max_q_state == self.goal_state: # Check if the maximum Q-value state is the goal state.
            return  # reached
        # quit recursion if path is > 20
        self.len_path_to_goal += 1
        if self.len_path_to_goal > 20:
            return
        self.go(from_=max_q_state) # Recursively call the go function with the state corresponding to the maximum Q-value.

    def tester(self): # Print a dictionary which shows the Q-values for each action from each state.
        self.matrix = []
        for from_ in self.env.states:
            v = []
            for to_ in self.env.states:
                v.append(self.Q[path(from_, to_)])
            self.matrix.append({from_: v})
        for x in self.matrix:
            print(x)

In [None]:
e = Environment()
e.add_path(path(0, 1))
e.add_path(path(1, 2))
e.add_path(path(2, 3))
e.add_path(path(3, 4))
e.add_path(path(4, 0))
e.add_path(path(4, 5))
e.add_path(path(5, 0))
e.add_path(path(5, 1))
e.add_path(path(5, 2))
e.add_path(path(5, 3))
e.add_path(path(5, 4))
e.add_path(path(5, 5), 1.1)

In [9]:
# Run the algorithm
a = Agent(env=e, goal_state=5, explore=0.1, iterations=1000) # Creates an agent
"""
Parameters:
env: The environment the agent will navigate.
goal_state: The goal state the agent is trying to reach.
explore: Exploration rate.
iterations: The number of iterations the agent will use to learn.
"""
a.learn() # Trains the agent using the Q-learning algorithm for the specified number of iterations.
a.tester() # Prints a dictionary which shows the Q-values for each action from each state.
for state in e.states: # Iterates over each state in the environment.
    print('from ', state) # Prints the 'from'/starting state
    a.path_length = 0 
    # a.go(state) Selects the highest Q-value action until the goal is reached or a maximum path length is exceeded.
    a.go(state)

0.09158478740507359
Exploiting path  path(from_=0, to_=0)
0.8354988781294496
Exploring path  path(from_=2, to_=5)
0.8538343854854736
Exploring path  path(from_=5, to_=2)
0.2122188106686006
Exploring path  path(from_=4, to_=0)
0.6812461849926625
Exploring path  path(from_=4, to_=3)
0.39353182020537136
Exploring path  path(from_=5, to_=5)
0.9493954730932436
Exploring path  path(from_=4, to_=4)
0.5020672922450359
Exploring path  path(from_=3, to_=0)
0.36401442784736937
Exploring path  path(from_=0, to_=2)
0.42361096957169064
Exploring path  path(from_=3, to_=4)
0.5605103610264989
Exploring path  path(from_=1, to_=1)
0.023858079140782196
Exploiting path  path(from_=1, to_=1)
0.1735887537650569
Exploring path  path(from_=2, to_=4)
0.3596914037964013
Exploring path  path(from_=4, to_=4)
0.5598903162914775
Exploring path  path(from_=5, to_=3)
0.7344016918939777
Exploring path  path(from_=3, to_=2)
0.3537869778416035
Exploring path  path(from_=4, to_=3)
0.9549331199676101
Exploring path  path(