In [1]:
import numpy as np
import tensorflow as tf
from datetime import datetime

In [2]:
# Define the grid world environment
GRID_SIZE = 5
NUM_TREASURES = 5

In [3]:
# Create the treasure locations randomly
treasure_locations = np.random.randint(GRID_SIZE, size=(NUM_TREASURES, 2))

In [4]:
# Initialize the Q-network
num_actions = 4
input_shape = (GRID_SIZE, GRID_SIZE, 1)
inputs = tf.keras.Input(shape=input_shape)
x = tf.keras.layers.Flatten()(inputs)
x = tf.keras.layers.Dense(32, activation='relu')(x)
outputs = tf.keras.layers.Dense(num_actions)(x)
q_network = tf.keras.Model(inputs=inputs, outputs=outputs)

In [5]:
# Define the epsilon-greedy exploration strategy
def epsilon_greedy(action_values, epsilon):
    if np.random.rand() < epsilon:
        return np.random.randint(num_actions)
    else:
        return np.argmax(action_values)

In [6]:
# Define the loss function for training the Q-network
loss_fn = tf.keras.losses.MeanSquaredError()

In [7]:
# Define the optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

In [8]:
# Define the training loop
num_episodes = 1000
epsilon = 1.0  # Initial exploration factor
epsilon_min = 0.01  # Minimum exploration factor
epsilon_decay = 0.99  # Decay rate for exploration factor

In [9]:
# Create a log directory for TensorBoard
log_dir = "logs/" + datetime.now().strftime("%Y%m%d-%H%M%S")
summary_writer = tf.summary.create_file_writer(log_dir)

In [10]:
@tf.function
def train_episode(q_network, optimizer, state, epsilon):
    total_reward = tf.constant(0.0, dtype=tf.float32)
    done = tf.constant(False)

    while not tf.reduce_any(done):
        state_tensor = tf.convert_to_tensor(state[np.newaxis, ...], dtype=tf.float32)
        action_values = q_network(state_tensor)

        action = epsilon_greedy(action_values[0], epsilon)

        next_state = tf.identity(state)
        reward = tf.constant(0.0, dtype=tf.float32)

        next_state = tf.where(tf.equal(action, 0),
                              tf.concat([next_state[:, :, 0:1] - 1, next_state[:, :, 1:]], axis=-1),
                              tf.where(tf.equal(action, 1),
                                       tf.concat([next_state[:, :, 0:1] + 1, next_state[:, :, 1:]], axis=-1),
                                       tf.where(tf.equal(action, 2),
                                                tf.concat([next_state[:, :, 0:1], next_state[:, :, 1:2] - 1], axis=-1),
                                                tf.concat([next_state[:, :, 0:1], next_state[:, :, 1:2] + 1], axis=-1))))

        for i in range(NUM_TREASURES):
            if tf.reduce_all(tf.equal(next_state[:, :, 0], treasure_locations[i, 0])) and tf.reduce_all(tf.equal(next_state[:, :, 1], treasure_locations[i, 1])):
                reward += 1
                treasure_locations[i] = np.array([-1, -1])  # Remove the collected treasure

        with tf.GradientTape() as tape:
            next_state_tensor = tf.convert_to_tensor(next_state[np.newaxis, ...], dtype=tf.float32)
            next_action_values = q_network(next_state_tensor)
            max_next_action_value = tf.reduce_max(next_action_values, axis=-1)
            target = tf.where(done, reward, reward + max_next_action_value)
            target_f = tf.reshape(target, shape=(1, 1))
            predicted_values = tf.reduce_sum(action_values * tf.one_hot(action, num_actions), axis=-1)
            predicted_values_f = tf.reshape(predicted_values, shape=(1, 1))
            loss = loss_fn(target_f, predicted_values_f)

        grads = tape.gradient(loss, q_network.trainable_variables)
        optimizer.apply_gradients(zip(grads, q_network.trainable_variables))

        state = next_state
        total_reward += reward

        done = tf.logical_or(done, tf.equal(reward, NUM_TREASURES))

    return total_reward

In [11]:
# Training loop
for episode in range(num_episodes):
    state = np.zeros((GRID_SIZE, GRID_SIZE, 2), dtype=np.float32)  # Specify dtype as float32
    state[0, 0, 0] = 1  # Agent's initial position

    total_reward = train_episode(q_network, optimizer, state, epsilon)

    # Decay the exploration factor after each episode
    epsilon = max(epsilon_min, epsilon * epsilon_decay)

    with summary_writer.as_default():
        tf.summary.scalar('Total Reward', total_reward, step=episode)
        tf.summary.scalar('Epsilon', epsilon, step=episode)

    print("Episode:", episode, "Total Reward:", total_reward)

ValueError: ignored