In [91]:
import numpy as np
import gym
import tensorflow as tf
from tensorflow.keras import layers


In [92]:
import random
from collections import deque


class DQNAgent:
    def __init__(self, state_size, action_size, learning_rate=0.001, gamma=0.99,
                 epsilon=0.1, epsilon_decay=0.995, epsilon_min=0.01,
                 buffer_size=10000, batch_size=32):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)
        self.gamma = gamma  # discount rate
        self.epsilon = epsilon  # exploration rate
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.model = self._build_model()

    def _build_model(self):
        """Neural Net for Deep-Q learning Model."""
        model = tf.keras.Sequential([
            layers.Dense(24, activation='relu', input_dim=self.state_size),
            layers.Dense(24, activation='relu'),
            layers.Dense(self.action_size, activation='linear')
        ])
        model.compile(loss='mean_squared_error',
                      optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def choose_action(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.randint(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        minibatch = random.sample(self.memory, self.batch_size)
        states, targets_f = [], []
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                next_state = np.reshape(next_state, (1, self.state_size))
                target += self.gamma * np.amax(self.model.predict(next_state)[0])
            state = np.reshape(state, (1, self.state_size))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            states.append(state.squeeze())  # remove the batch dimension for fitting
            targets_f.append(target_f[0])
        self.model.train_on_batch(np.array(states), np.array(targets_f))


    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)



# Create the env and init the agent
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size=state_size, action_size=action_size)

episodes = 1000  # Define the number of episodes for training
for e in range(episodes):
    state = env.reset()[0]
    state = np.reshape(state, [1, state_size])
    total_reward = 0

    while True:
        action = agent.choose_action(state)
        next_state, reward, done, info, _ = env.step(action)
        next_state = np.reshape(next_state, [1, state_size])
        
        # Store transition in replay buffer
        agent.remember(state, action, reward, next_state, done)
        
        state = next_state
        total_reward += reward
        
        if done:
            print(f"Episode: {e+1}/{episodes}, Score: {total_reward}, Epsilon: {agent.epsilon:.2f}")
            break

        # Train the agent with the experience of the episode
        agent.replay()

    # Optionally save the model
#      if (e + 1) % 50 == 0:
#        agent.save(f'cartpole_model_{e+1}.h5')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 56ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
Episode: 1/1000, Score: 12.0, Epsilon: 0.10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━

KeyboardInterrupt: 

# Create the environment
env = gym.make('CartPole-v1')

# Model definition using TensorFlow
def create_model():
    model = tf.keras.Sequential([
        layers.Dense(24, activation='relu'),
        layers.Dense(24, activation='relu'),
        layers.Dense(env.action_space.n, activation='linear')
    ])
    return model

model = create_model()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_function = tf.keras.losses.MeanSquaredError()

# Action selection function
def choose_action(state, epsilon):
    if np.random.rand() < epsilon:
        return env.action_space.sample()
    else:
        state_tensor = tf.convert_to_tensor([state], dtype=tf.float32)  # Change here
        q_values = model(state_tensor)
        return np.argmax(q_values.numpy())

# Update train_step function to handle data types appropriately
@tf.function
def train_step(gamma,state_batch, action_batch, reward_batch, next_state_batch, done_batch):
    with tf.GradientTape() as tape:
        q_values = model(state_batch, training=True)
        action_indices = tf.stack([tf.range(action_batch.shape[0]), action_batch], axis=1)
        predicted_q = tf.gather_nd(q_values, indices=action_indices)

        future_q = model(next_state_batch)
        target_q = reward_batch + (1. - done_batch) * gamma * tf.reduce_max(future_q, axis=1)

        loss = loss_function(target_q, predicted_q)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss

# Main training loop
def main():
    episodes = 500
    gamma = 0.99
    epsilon = 0.1
    min_epsilon = 0.01
    epsilon_decay = 0.995
    batch_size = 32
    buffer = []

    for episode in range(episodes):
        state_tuple = env.reset()
        state  = state_tuple[0]
        total_reward = 0
        done = False
        while not done:
            action = choose_action(state, epsilon)
            next_state, reward, done, info, _ = env.step(action)
            buffer.append((state, action, reward, next_state, done))
            state = next_state
            total_reward += reward

            if len(buffer) > batch_size:
                batch = np.random.choice(len(buffer), batch_size, replace=False)
                batch = [buffer[i] for i in batch]
                state_batch, action_batch, reward_batch, next_state_batch, done_batch = map(np.array, zip(*batch))
                
                state_batch = np.array(state_batch, dtype=np.float32)
                action_batch = np.array(action_batch, dtype=np.int32)
                reward_batch = np.array(reward_batch, dtype=np.float32)  # Make sure this is correct as shown above
                next_state_batch = np.array(next_state_batch, dtype=np.float32)
                done_batch = np.array(done_batch, dtype=np.float32)  # Typically this should be boolean or float for masking

                train_step(gamma,
                           tf.convert_to_tensor(state_batch, dtype=tf.float32),
                           tf.convert_to_tensor(action_batch, dtype=tf.int32),
                           tf.convert_to_tensor(reward_batch, dtype=tf.float32),
                           tf.convert_to_tensor(next_state_batch, dtype=tf.float32),
                           tf.convert_to_tensor(done_batch, dtype=tf.float32))  # Assuming this needs to be a float for computational reasons

        epsilon = max(min_epsilon, epsilon * epsilon_decay)
        print(f"Episode {episode} - Total Reward: {total_reward}, Epsilon: {epsilon}")

if __name__ == "__main__":
    main()

