##Libraries

In [2]:
import gymnasium as gym
import numpy as np
from tensorflow import keras
import matplotlib.pyplot as plt
import time
import random
import math
from gymnasium.envs.toy_text.frozen_lake import generate_random_map

## Hyperparameters

In [3]:
GAMMA = 0.9
MEMORY_SIZE = 1000
LEARNING_RATE = 0.001
BATCH_SIZE = 32
EXPLORATION_MAX = 1
EXPLORATION_MIN = 0.01
EXPLORATION_DECAY = 0.995
NUMBER_OF_EPISODES_FOR_TRAINING = 120
NUMBER_OF_EPISODES_FOR_TESTING = 20
ACTIVATION = "relu"
OPTIMIZER = ""
INITIALIZER = "he_normal"
ENVIRONMENT = "FrozenLake-v1"

In [4]:
class ReplayMemory:

    def __init__(self,number_of_observations):
        # Create replay memory
        self.states = np.zeros((MEMORY_SIZE, number_of_observations))
        self.states_next = np.zeros((MEMORY_SIZE, number_of_observations))
        self.actions = np.zeros(MEMORY_SIZE, dtype=np.int32)
        self.rewards = np.zeros(MEMORY_SIZE)
        self.terminal_states = np.zeros(MEMORY_SIZE, dtype=bool)
        self.current_size=0

    def store_transition(self, state, action, reward, state_next, terminal_state):
        # Store a transition (s,a,r,s') in the replay memory
        i = self.current_size
        self.states[i] = state
        self.states_next[i] = state_next
        self.actions[i] = action
        self.rewards[i] = reward
        self.terminal_states[i] = terminal_state
        self.current_size = i + 1

    def sample_memory(self, batch_size):
        # Generate a sample of transitions from the replay memory
        batch = np.random.choice(self.current_size, batch_size)
        states = self.states[batch]
        states_next = self.states_next[batch]
        rewards = self.rewards[batch]
        actions = self.actions[batch]
        terminal_states = self.terminal_states[batch]
        return states, actions, rewards, states_next, terminal_states

In [8]:
class DQN:

    def __init__(self, number_of_observations, number_of_actions):
        # Initialize variables and create neural model
        self.exploration_rate = EXPLORATION_MAX
        self.number_of_actions = number_of_actions
        self.number_of_observations = number_of_observations
        self.scores = []
        self.memory = ReplayMemory(number_of_observations)
        self.model = keras.models.Sequential()
        self.model.add(keras.layers.Dense(24, input_shape=(number_of_observations,), \
                             activation=ACTIVATION,kernel_initializer=INITIALIZER))
        self.model.add(keras.layers.Dense(24, activation=ACTIVATION,kernel_initializer=INITIALIZER))
        self.model.add(keras.layers.Dense(number_of_actions, activation="linear"))
        self.model.compile(loss="mse", optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE))

    def remember(self, state, action, reward, next_state, terminal_state):
        # Store a tuple (s, a, r, s') for experience replay
        state = np.reshape(state, [1, self.number_of_observations])
        next_state = np.reshape(next_state, [1, self.number_of_observations])
        self.memory.store_transition(state, action, reward, next_state, terminal_state)

    def select(self, state):
        # Generate an action for a given state using epsilon-greedy policy
        if np.random.rand() < self.exploration_rate:
            return random.randrange(self.number_of_actions)
        else:
            state = np.reshape(state, [1, self.number_of_observations])
            q_values = self.model.predict(state, verbose=0)
            return np.argmax(q_values[0])

    def select_greedy_policy(self, state):
        # Generate an action for a given state using greedy policy
        state = np.reshape(state, [1, self.number_of_observations])
        q_values = self.model.predict(state, verbose=0)
        return np.argmax(q_values[0])

    def learn(self):
        # Learn the value Q using a sample of examples from the replay memory
        if self.memory.current_size < BATCH_SIZE: return

        states, actions, rewards, next_states, terminal_states = self.memory.sample_memory(BATCH_SIZE)

        q_targets = self.model.predict(states, verbose=0)
        q_next_states = self.model.predict(next_states, verbose=0)

        for i in range(BATCH_SIZE):
             if (terminal_states[i]):
                  q_targets[i][actions[i]] = rewards[i]
             else:
                  q_targets[i][actions[i]] = rewards[i] + GAMMA * np.max(q_next_states[i])

        self.model.train_on_batch(states, q_targets)

        # Decrease exploration rate
        self.exploration_rate *= EXPLORATION_DECAY
        self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)

    def add_score(self, score):
       # Add the obtained score to a list to be presented later
        self.scores.append(score)

    def delete_scores(self):
       # Delete the scores
        self.scores = []

    def display_scores_graphically(self):
        # Display the obtained scores graphically
        plt.plot(self.scores)
        plt.xlabel("Episode")
        plt.ylabel("Score")

In [9]:
def create_environment():
    # Create simulated environment
    environment = gym.make(ENVIRONMENT, desc=generate_random_map(size=8), map_name="4x4", is_slippery=True, render_mode="rgb_array")
    number_of_observations = environment.observation_space.n
    number_of_actions = environment.action_space.n
    return environment, number_of_observations, number_of_actions

## Training program




In [12]:
environment, number_of_observations, number_of_actions = create_environment()
agent = DQN(number_of_observations, number_of_actions)
episode = 0
start_time = time.perf_counter()
while (episode < NUMBER_OF_EPISODES_FOR_TRAINING):
    episode += 1
    score = 0
    state, info = environment.reset()
    print(state)
    end_episode = False
    while not(end_episode):
        # Select an action for the current state
        action = agent.select(state)

        # Execute the action on the environment
        state_next, reward, terminal_state, truncated, info = environment.step(action)
        
        print(state_next, reward, terminal_state, truncated, info)
        
        #Pintar el entorno
        environment.render()

        # Store in memory the transition (s,a,r,s')
        agent.remember(state, action, reward, state_next, terminal_state)

        score += reward

        # Learn using a batch of experience stored in memory
        agent.learn()

        # Detect end of episode
        if terminal_state or truncated:
            agent.add_score(score)
            print("Episode {0:>3}: ".format(episode), end = '')
            print("score {0:>3} ".format(math.trunc(score)), end = '')
            print("(exploration rate: %.2f, " % agent.exploration_rate, end = '')
            print("transitions: " + str(agent.memory.current_size) + ")")
            end_episode = True
        else:
            state = state_next

print("Time for training:", round((time.perf_counter() - start_time)/60), "minutes")
print("Score (max):", max(agent.scores))
average_score = np.mean(agent.scores[max(0,(len(agent.scores)-10)):(len(agent.scores))])
print("Score (average last 10 episodes):", average_score)

agent.display_scores_graphically()

0
0 0.0 False False {'prob': 0.3333333333333333}
0


ValueError: cannot reshape array of size 1 into shape (1,64)


## Testing program



In [7]:
agent.delete_scores()
episode = 0
start_time = time.perf_counter()
while (episode < NUMBER_OF_EPISODES_FOR_TESTING):
    episode += 1
    score = 0
    state, info = environment.reset()
    end_episode = False
    while not(end_episode):
        # Select an action for the current state
        action = agent.select_greedy_policy(state)

        # Execute the action in the environment
        state_next, reward, terminal_state, truncated, info = environment.step(action)

        score += reward

        # Detect end of episode and print
        if terminal_state or truncated:
            agent.add_score(score)
            print("Episode {0:>3}: ".format(episode), end = '')
            print("score {0:>3} \n".format(math.trunc(score)), end = '')
            end_episode = True
        else:
            state = state_next

print("Time for testing:", round((time.perf_counter() - start_time)/60), "minutes")
print("Score (average):", np.mean(agent.scores))
print("Score (max):", max(agent.scores))

Episode   1: score 500 
Episode   2: score 500 
Episode   3: score 500 
Episode   4: score 500 
Episode   5: score 500 
Episode   6: score 500 
Episode   7: score 500 
Episode   8: score 263 
Episode   9: score 500 
Episode  10: score 500 
Episode  11: score 500 
Episode  12: score 304 
Episode  13: score 492 
Episode  14: score 279 
Episode  15: score 500 
Episode  16: score 500 
Episode  17: score 500 
Episode  18: score 500 
Episode  19: score 500 
Episode  20: score 500 
Time for testing: 13 minutes
Score (average): 466.9
Score (max): 500.0
