## Text Flappy Bird
*Romain Mondelice*

The goal of this assignment is to apply reinforcement learning methods to a
simple game called Text Flappy Bird (TFB). The game is a variation to the
well know Flappy Bird in which the player is made with a simple unit-element
character

## General imports

In [None]:
import os, sys
import gymnasium as gym
import time

import numpy as np
from collections import defaultdict
from tqdm import tqdm
import pickle
import random

import text_flappy_bird_gym

## Preprocess env

In [None]:
# initiate environment
env = gym.make('TextFlappyBird-v0', height = 15, width = 20, pipe_gap = 4)
obs = env.reset()

In [None]:
total_reward = 0
while True:
        # Select next action
        action = env.action_space.sample()  # for an agent, action = agent.policy(observation)

        # Appy action and return new observation of the environment
        obs, reward, done, _, info = env.step(action)
        total_reward += reward

        # Render the game
        os.system("clear")
        sys.stdout.write(env.render())
        time.sleep(0.2) # FPS

        # If player is dead break
        if done:
            break

In [None]:
print(total_reward)
env.close()

## Monte Carlo based agent

In [None]:
class OffPolicyMonteCarloAgent:
    def __init__(self, env, gamma=0.9, epsilon=0.1):
        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon
        self.Q = defaultdict(self.zero_action_value)
        self.C = defaultdict(self.zero_action_value)
        self.target_policy = defaultdict(int)

    def zero_action_value(self):
        """Returns a default value for actions, a zero array with the size of the action space."""
        return np.zeros(self.env.action_space.n)
        
    def generate_episode(self, policy):
        episode = []
        state = self.env.reset()
        done = False
        while not done:
            # Convert state to a string representation.
            str_state = str(state)

            if str_state in policy:
                action_probs = policy[str_state]
                action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
            else:
                # Fallback if the state is not in the policy, use uniform random selection
                action = self.env.action_space.sample()

            next_state, reward, done, _, info = self.env.step(action)
            episode.append((str_state, action, reward))
            state = next_state
        return episode
    
    def get_probs(self, Q_s, epsilon, nA):
        """Obtains the policy for a given state"""
        policy_s = np.ones(nA) * epsilon / nA
        best_a = np.argmax(Q_s)
        policy_s[best_a] = 1 - epsilon + (epsilon / nA)
        return policy_s
    
    def update_Q(self, episode):
        G = 0.0
        W = 1.0
        for t in reversed(range(len(episode))):
            state, action, reward = episode[t]
            G = self.gamma * G + reward
            self.C[state][action] += W
            self.Q[state][action] += (W / self.C[state][action]) * (G - self.Q[state][action])
            self.target_policy[state] = np.argmax(self.Q[state])
            
            if action != self.target_policy[state]:
                break
            W = W * 1./self.get_probs(self.Q[state], self.epsilon, self.env.action_space.n)[action]
            
    def train(self, num_episodes):
        for i_episode in range(1, num_episodes + 1):
            episode = self.generate_episode(policy=self.create_behavior_policy(self.Q))
            self.update_Q(episode)
    
    def create_behavior_policy(self, Q):
        """Creates a behavior policy using ε-greedy approach based on Q."""
        behavior_policy = {}
        for state, actions in Q.items():
            behavior_policy[state] = self.get_probs(actions, self.epsilon, self.env.action_space.n)
        return behavior_policy

In [None]:
def train_monte_carlo_agent(env_str, episodes=1000, gamma=0.9, epsilon=0.1):
    env = gym.make(env_str, height=15, width=20, pipe_gap=4)
    agent = OffPolicyMonteCarloAgent(env, gamma=gamma, epsilon=epsilon)

    for _ in tqdm(range(episodes), desc="Training process"):
        # Create the behavior policy from current Q
        behavior_policy = agent.create_behavior_policy(agent.Q)
        # Generate an episode using the behavior policy
        episode = agent.generate_episode(behavior_policy)
        # Update Q-values based on the episode
        agent.update_Q(episode)
    
    print("Training completed.")
    return agent

In [116]:
def test_agent(agent, episodes=100, reward_threshold=10000):
    total_rewards = 0
    episode_scores = []  # To store the score of each episode

    for episode_num in tqdm(range(1, episodes + 1), desc="Testing episodes"):
        state = agent.env.reset()
        done = False
        episode_reward = 0

        while not done:
            # Convert state to a string representation for consistency.
            str_state = str(state)
            
            # Use the target_policy for action selection if this state has been seen.
            # Otherwise, select a random action.
            if str_state in agent.target_policy:
                action = agent.target_policy[str_state]
            else:
                action = agent.env.action_space.sample()

            state, reward, done, _, info = agent.env.step(action)
            episode_reward += reward

            # Check if the reward threshold for this episode has been exceeded
            if episode_reward > reward_threshold:
                break

        # Episode is done or threshold exceeded, append its total reward to episode_scores
        episode_scores.append(episode_reward)
        total_rewards += episode_reward
    
    avg_reward = total_rewards / episodes
    print("Total reward across all episodes: ", total_rewards)
    print(f"Average Reward over {episodes} episodes: {avg_reward}")


### Text Flappy Bird Screen env

#### Training

In [None]:
trained_agent = train_monte_carlo_agent('TextFlappyBird-screen-v0', episodes=5000)

#### Testing

In [None]:
# Test the trained Monte Carlo agent
test_agent(trained_agent)

#### Save agent

In [None]:
# Assuming `trained_agent` is your Monte Carlo agent that you have trained
with open('../agents/mc-agent-screen.pkl', 'wb') as f:
    pickle.dump(trained_agent, f)

### Text Flappy Bird env

#### Training

In [None]:
trained_agent = train_monte_carlo_agent('TextFlappyBird-v0', episodes=10000)

#### Testing

Here I have set the reward treshold to 10000 in my function so the maximum reward that one episode can reach is 10000 and the maximum average reward over 100 episodes that we can reach is 10000 should not exceed 10000.

If we go in this case that mean that the model learn extremly well and can go and have a very high score. We need to stop the test other wise it will take infinite amount of time.

In [117]:
test_agent(trained_agent)

Testing episodes: 100%|██████████| 100/100 [00:08<00:00, 11.44it/s]

Total reward across all episodes:  1000100
Average Reward over 100 episodes: 10001.0





#### Save

In [118]:
# Assuming `trained_agent` is your Monte Carlo agent that you have trained
with open('../agents/mc-agent.pkl', 'wb') as f:
    pickle.dump(trained_agent, f)

## Sarsa based agent

In [None]:
class SarsaLambdaAgent:
    def __init__(self, env, lambda_):
        self.env = env
        self.lambda_ = lambda_
        # Initialize Q-values and eligibility traces
        # Define epsilon for epsilon-greedy policy
    
    def policy(self, observation):
        # Define epsilon-greedy policy here
        return action
    
    def update_q_values(self, state, action, reward, next_state, next_action):
        # Update Q-values using the Sarsa(λ) formula
    
    def update_eligibility_traces(self, state, action):
        # Update eligibility traces
