In [9]:
import gymnasium
import flappy_bird_gymnasium
import numpy as np
import pygame
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
from enum import IntEnum
from flappy_bird_gymnasium.envs.flappy_bird_env import FlappyBirdEnv
from flappy_bird_gymnasium.envs.flappy_bird_env import Actions
from flappy_bird_gymnasium.envs.constants import (
    PLAYER_FLAP_ACC,
    PLAYER_ACC_Y,
    PLAYER_MAX_VEL_Y,
    PLAYER_HEIGHT,
    PLAYER_VEL_ROT,
    PLAYER_WIDTH,
    PIPE_WIDTH,
    PIPE_VEL_X,
)



In [10]:

# Neural Network Model for Q-value approximation
class DQNCNN(nn.Module):
    def __init__(self, input_dim, action_space):
        super(DQNCNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 64, kernel_size=3, stride=1, padding=1)  # 1 input channel
        self.conv2 = nn.Conv1d(64, 64, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(64 * input_dim, 128)
        self.fc2 = nn.Linear(128, action_space)

    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension: (batch_size, 1, input_dim)
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = torch.relu(self.fc1(x))
        return self.fc2(x)


# Replay Memory to store experiences
class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


# DQN Agent Class
class DQNAgent:
    def __init__(self, env):
        # Environment
        self.env = env
        self.state_dim = env.observation_space.shape[0]  # First dimension of observation
        self.action_space = env.action_space.n

        # Hyperparameters
        self.learning_rate = 0.001
        self.discount_factor = 0.99
        self.epsilon = 1.0  # Initial exploration rate
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.batch_size = 64
        self.memory_size = 10000
        self.episodes = 500
        self.target_update_freq = 100

        # Initialize policy and target networks
        self.policy_net = DQNCNN(self.state_dim, self.action_space)
        self.target_net = DQNCNN(self.state_dim, self.action_space)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        # Optimizer and Replay Memory
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.learning_rate)
        self.memory = ReplayMemory(self.memory_size)

    def select_action(self, state, testing=False):
        """Epsilon-greedy action selection."""
        if not testing and random.random() < self.epsilon:
            return random.randint(0, self.action_space - 1)  # Random action
        else:
            with torch.no_grad():
                state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
                return self.policy_net(state).argmax(dim=1).item()

    def optimize_model(self):
        """Sample a batch from memory and optimize the policy network."""
        if len(self.memory) < self.batch_size:
            return

        batch = self.memory.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        # Convert to tensors
        states = torch.tensor(states, dtype=torch.float32)
        actions = torch.tensor(actions, dtype=torch.long).unsqueeze(1)
        rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1)
        next_states = torch.tensor(next_states, dtype=torch.float32)
        dones = torch.tensor(dones, dtype=torch.float32).unsqueeze(1)

        # Compute Q-values and targets
        q_values = self.policy_net(states).gather(1, actions)
        next_q_values = self.target_net(next_states).max(1, keepdim=True)[0]
        targets = rewards + (self.discount_factor * next_q_values * (1 - dones))

        # Loss and backpropagation
        loss = nn.MSELoss()(q_values, targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def train(self):
        """Train the agent."""
        for episode in range(self.episodes):
            state, _ = self.env.reset()
            done = False
            total_reward = 0

            while not done:
                # Select and execute action
                action = self.select_action(state)
                next_state, reward, done, _, _ = self.env.step(action)
                self.memory.push(state, action, reward, next_state, done)
                state = next_state
                total_reward += reward

                # Optimize model
                self.optimize_model()

            # Update target network periodically
            if episode % self.target_update_freq == 0:
                self.target_net.load_state_dict(self.policy_net.state_dict())

            # Decay epsilon
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

            print(f"Episode: {episode + 1}, Total Reward: {total_reward}, Epsilon: {self.epsilon:.4f}")

        self.env.close()
        print("Training complete!")

    def test(self, num_episodes=10):
        """Test the trained policy with real-time rendering."""
        print("\nTesting the trained policy...\n")
        self.epsilon = 0.0  # Disable exploration
        test_env = gymnasium.make("FlappyBird-v0", render_mode="human")  # Render in "human" mode 
        total_rewards = []

        for episode in range(num_episodes):
            state, _ = test_env.reset()
            done = False
            total_reward = 0

            while not done:
                action = self.select_action(state, testing=True)
                next_state, reward, done, _, _ = test_env.step(action)
                state = next_state
                total_reward += reward

            total_rewards.append(total_reward)
            print(f"Test Episode: {episode + 1}, Total Reward: {total_reward}")

        avg_reward = np.mean(total_rewards)
        print(f"\nAverage Reward over {num_episodes} Test Episodes: {avg_reward}")
        test_env.close()

In [11]:
env = gymnasium.make("FlappyBird-v0", render_mode="rgb_array") ##use_lidar=True option avaliable

# Create and train DQN agent
agent = DQNAgent(env)
agent.train()

# Test the trained agent
agent.test(num_episodes=10)


Episode: 1, Total Reward: -8.099999999999998, Epsilon: 0.9950
Episode: 2, Total Reward: -7.499999999999998, Epsilon: 0.9900
Episode: 3, Total Reward: -8.7, Epsilon: 0.9851
Episode: 4, Total Reward: -6.299999999999999, Epsilon: 0.9801
Episode: 5, Total Reward: -8.7, Epsilon: 0.9752
Episode: 6, Total Reward: -6.899999999999999, Epsilon: 0.9704
Episode: 7, Total Reward: -8.099999999999998, Epsilon: 0.9655
Episode: 8, Total Reward: -8.099999999999998, Epsilon: 0.9607
Episode: 9, Total Reward: -7.499999999999998, Epsilon: 0.9559
Episode: 10, Total Reward: -7.499999999999998, Epsilon: 0.9511
Episode: 11, Total Reward: -6.899999999999999, Epsilon: 0.9464
Episode: 12, Total Reward: -5.099999999999998, Epsilon: 0.9416
Episode: 13, Total Reward: -6.299999999999999, Epsilon: 0.9369
Episode: 14, Total Reward: -8.099999999999998, Epsilon: 0.9322
Episode: 15, Total Reward: -7.499999999999998, Epsilon: 0.9276
Episode: 16, Total Reward: -7.499999999999998, Epsilon: 0.9229
Episode: 17, Total Reward: -6

In [14]:
agent.test(num_episodes=10)


Testing the trained policy...

Test Episode: 1, Total Reward: -8.7
Test Episode: 2, Total Reward: -8.7
Test Episode: 3, Total Reward: -8.7
Test Episode: 4, Total Reward: -8.7
Test Episode: 5, Total Reward: -8.099999999999998
Test Episode: 6, Total Reward: -8.099999999999998


KeyboardInterrupt: 

In [None]:
def env_customization(update_step=0):
    """Override the step method dynamically based on update_option."""
    # Save the original step method
    original_step = FlappyBirdEnv.step

    def custom_step(self, action):
        """Custom step method to modify the reward for staying alive."""
        if update_step == 0:
            # Use the original step method
            return original_step(self, action)
        if update_step ==1:
            # Custom step logic (update_option != 0)
            terminal = False
            reward = None

            self._sound_cache = None
            if action == self.Actions.FLAP:
                if self._player_y > -2 * self.PLAYER_HEIGHT:
                    self._player_vel_y = self.PLAYER_FLAP_ACC
                    self._player_flapped = True
                    self._sound_cache = "wing"

            # Check for score
            player_mid_pos = self._player_x + self.PLAYER_WIDTH / 2
            for pipe in self._upper_pipes:
                pipe_mid_pos = pipe["x"] + self.PIPE_WIDTH / 2
                if pipe_mid_pos <= player_mid_pos < pipe_mid_pos + 4:
                    self._score += 1
                    reward = 1  # reward for passed pipe
                    self._sound_cache = "point"

            # Player movement and rotation
            if self._player_rot > -90:
                self._player_rot -= self.PLAYER_VEL_ROT
            if self._player_vel_y < self.PLAYER_MAX_VEL_Y and not self._player_flapped:
                self._player_vel_y += self.PLAYER_ACC_Y
            if self._player_flapped:
                self._player_flapped = False
                self._player_rot = 45
            self._player_y += min(
                self._player_vel_y, self._ground["y"] - self._player_y - self.PLAYER_HEIGHT
            )

            # Move pipes
            for up_pipe, low_pipe in zip(self._upper_pipes, self._lower_pipes):
                up_pipe["x"] += self.PIPE_VEL_X
                low_pipe["x"] += self.PIPE_VEL_X
                if up_pipe["x"] < -self.PIPE_WIDTH:
                    new_up_pipe, new_low_pipe = self._get_random_pipe()
                    up_pipe.update(new_up_pipe)
                    low_pipe.update(new_low_pipe)

            if self.render_mode == "human":
                self.render()

            obs, reward_private_zone = self._get_observation()
            if reward is None:
                if reward_private_zone is not None:
                    reward = reward_private_zone
                else:
                    reward = 0.1  # Change reward for staying alive

            # Agent touches the top of the screen
            if self._player_y < 0 or self._player_y + self.PLAYER_HEIGHT > self._ground["y"]:
                reward = -1.0
                terminal = True

            # Check for crash
            if self._check_crash():
                self._sound_cache = "hit"
                reward = -1  # reward for dying
                terminal = True
                self._player_vel_y = 0

            info = {"score": self._score}

            return (
                obs,
                reward,
                terminal,
                (self._score_limit is not None) and (self._score >= self._score_limit),
                info,
            )

        # Override the step method in FlappyBirdEnv
        FlappyBirdEnv.step = custom_step