In [2]:
import gym
import numpy as np
import os

# Force CPU usage to avoid GPU driver issues
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import tensorflow as tf

import matplotlib.pyplot as plt
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from collections import deque
import random

# Create the environment
env = gym.make('CartPole-v1')

# Define the DQN agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer='adam')
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state, verbose=0)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        
        minibatch = random.sample(self.memory, batch_size)
        states = np.array([e[0][0] for e in minibatch])
        actions = np.array([e[1] for e in minibatch])
        rewards = np.array([e[2] for e in minibatch])
        next_states = np.array([e[3][0] for e in minibatch])
        dones = np.array([e[4] for e in minibatch])

        targets = rewards + (1 - dones) * self.gamma * np.max(self.model.predict(next_states, verbose=0), axis=1)
        target_f = self.model.predict(states, verbose=0)
        
        for i, action in enumerate(actions):
            target_f[i][action] = targets[i]
        
        self.model.fit(states, target_f, epochs=1, verbose=0)
        
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# Initialize and train
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)

episodes = 500  # Reduced for faster testing
batch_size = 32

print("Starting training...")
for e in range(episodes):
    state = env.reset()
    if isinstance(state, tuple):  # Handle newer gym versions
        state = state[0]
    state = np.reshape(state, [1, state_size])
    
    total_reward = 0
    for time in range(500):
        action = agent.act(state)
        step_result = env.step(action)
        
        if len(step_result) == 5:  # Newer gym version
            next_state, reward, done, truncated, _ = step_result
            done = done or truncated
        else:  # Older gym version
            next_state, reward, done, _ = step_result
            
        next_state = np.reshape(next_state, [1, state_size])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward
        
        if done:
            print(f"Episode: {e+1}/{episodes}, Score: {total_reward}, Epsilon: {agent.epsilon:.3f}")
            break
            
        agent.replay(batch_size)

print("Training completed!")

Starting training...
Episode: 1/500, Score: 15.0, Epsilon: 1.000
Episode: 2/500, Score: 37.0, Epsilon: 0.905
Episode: 2/500, Score: 37.0, Epsilon: 0.905
Episode: 3/500, Score: 42.0, Epsilon: 0.737
Episode: 3/500, Score: 42.0, Epsilon: 0.737
Episode: 4/500, Score: 12.0, Epsilon: 0.697
Episode: 4/500, Score: 12.0, Epsilon: 0.697
Episode: 5/500, Score: 18.0, Epsilon: 0.640
Episode: 5/500, Score: 18.0, Epsilon: 0.640
Episode: 6/500, Score: 24.0, Epsilon: 0.570
Episode: 6/500, Score: 24.0, Epsilon: 0.570
Episode: 7/500, Score: 10.0, Epsilon: 0.545
Episode: 7/500, Score: 10.0, Epsilon: 0.545
Episode: 8/500, Score: 11.0, Epsilon: 0.519
Episode: 8/500, Score: 11.0, Epsilon: 0.519
Episode: 9/500, Score: 16.0, Epsilon: 0.481
Episode: 9/500, Score: 16.0, Epsilon: 0.481
Episode: 10/500, Score: 15.0, Epsilon: 0.448
Episode: 10/500, Score: 15.0, Epsilon: 0.448
Episode: 11/500, Score: 11.0, Epsilon: 0.427
Episode: 11/500, Score: 11.0, Epsilon: 0.427
Episode: 12/500, Score: 10.0, Epsilon: 0.408
Episod

KeyboardInterrupt: 

In [None]:
# Test the trained agent
def test_agent(agent, env, num_episodes=5):
    """Test the trained agent and optionally render the environment"""
    test_scores = []
    
    # Set epsilon to 0 for pure exploitation (no random actions)
    original_epsilon = agent.epsilon
    agent.epsilon = 0.0
    
    print("\n" + "="*50)
    print("TESTING TRAINED AGENT")
    print("="*50)
    
    for episode in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]
        state = np.reshape(state, [1, agent.state_size])
        
        total_reward = 0
        steps = 0
        
        for time in range(500):  # Max 500 steps per episode
            action = agent.act(state)  # Pure exploitation
            step_result = env.step(action)
            
            if len(step_result) == 5:
                next_state, reward, done, truncated, _ = step_result
                done = done or truncated
            else:
                next_state, reward, done, _ = step_result
            
            state = np.reshape(next_state, [1, agent.state_size])
            total_reward += reward
            steps += 1
            
            if done:
                break
        
        test_scores.append(total_reward)
        print(f"Test Episode {episode + 1}: Score = {total_reward} (survived {steps} steps)")
    
    # Restore original epsilon
    agent.epsilon = original_epsilon
    
    avg_score = np.mean(test_scores)
    print(f"\nAverage Test Score: {avg_score:.2f}")
    print(f"Best Test Score: {max(test_scores)}")
    print(f"Worst Test Score: {min(test_scores)}")
    
    return test_scores

# Create a visual environment for demonstration
def demonstrate_agent(agent, num_episodes=3):
    """Visually demonstrate the trained agent"""
    # Create environment with rendering
    visual_env = gym.make('CartPole-v1', render_mode='human')
    
    # Set epsilon to 0 for pure exploitation
    original_epsilon = agent.epsilon
    agent.epsilon = 0.0
    
    print("\n" + "="*50)
    print("VISUAL DEMONSTRATION - Watch the CartPole!")
    print("="*50)
    print("Close the window to continue to next episode...")
    
    for episode in range(num_episodes):
        state = visual_env.reset()
        if isinstance(state, tuple):
            state = state[0]
        state = np.reshape(state, [1, agent.state_size])
        
        total_reward = 0
        
        for time in range(500):
            visual_env.render()  # Show the environment
            action = agent.act(state)
            step_result = visual_env.step(action)
            
            if len(step_result) == 5:
                next_state, reward, done, truncated, _ = step_result
                done = done or truncated
            else:
                next_state, reward, done, _ = step_result
            
            state = np.reshape(next_state, [1, agent.state_size])
            total_reward += reward
            
            if done:
                print(f"Visual Demo Episode {episode + 1}: Score = {total_reward}")
                break
        
        # Small delay between episodes
        import time
        time.sleep(2)
    
    visual_env.close()
    agent.epsilon = original_epsilon
    print("Visual demonstration completed!")

# Run the tests
print("Running numerical tests first...")
test_scores = test_agent(agent, env, num_episodes=5)

print("\nStarting visual demonstration...")
demonstrate_agent(agent, num_episodes=3)

## Explanation of the DQN Implementation

1. **Environment Setup**: The `CartPole-v1` environment from OpenAI Gym is used to simulate the inverted pendulum problem. The goal is to balance a pole on a cart by applying forces to the cart.

2. **DQN Agent**:
   - The agent is defined with a neural network model that approximates the Q-function.
   - The model has three layers: two hidden layers with 24 neurons each and ReLU activation, and an output layer with linear activation.
   - The agent uses an epsilon-greedy policy for exploration and exploitation.

3. **Experience Replay**:
   - The agent stores past experiences in a replay buffer.
   - During training, random batches are sampled from the buffer to update the Q-values, improving stability and efficiency.

4. **Training Loop**:
   - For each episode, the agent interacts with the environment, selects actions, and stores experiences.
   - The agent trains on batches of experiences from the replay buffer to update its Q-function.
   - The exploration rate (`epsilon`) decays over time to shift from exploration to exploitation.

5. **Performance Metrics**:
   - The score for each episode is printed, along with the current exploration rate.
   - The training continues for 1000 episodes or until the agent achieves satisfactory performance.