In [None]:
import os, sys
import gymnasium as gym
import time
from collections import defaultdict
import random as rd 
import numpy as np
import text_flappy_bird_gym


# QLearning agent

## Implementation

In [None]:
class QLearningAgent():
    def __init__(self,env,epsilon,alpha,gamma,height = 15, width = 20):
        """Setup for the agent called when the experiment first starts.
        
        Args:
            epsilon (float): The epsilon parameter for exploration,
            step_size (float): The step-size,
            discount (float): The discount factor
        """
        # Store the parameters provided in agent_init_info.
        self.num_actions = 2
        self.num_states = height*width
        self.epsilon = epsilon
        self.step_size = alpha
        self.discount = gamma
        self.rand_generator = np.random.RandomState(12)
        
        # Create an array for action-value estimates and initialize it to zero.
        self.q = defaultdict(lambda: np.zeros(env.action_space.n))

        
    def agent_start(self, state):
        """The first method called when the episode starts, called after
        the environment starts.
        Args:
            state (int): the state from the
                environment's evn_start function.
        Returns:
            action (int): the first action the agent takes.
        """
        
        # Choose action using epsilon greedy.
        current_q = self.q[state]
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions) # random action selection
        else:
            action = self.argmax(current_q) # greedy action selection
        self.prev_state = state
        self.prev_action = action
        return action
    
    def agent_step(self, reward, state):
        """A step taken by the agent.
        Args:
            reward (float): the reward received for taking the last action taken
            state (int): the state from the
                environment's step based on where the agent ended up after the
                last step.
        Returns:
            action (int): the action the agent is taking.
        """
        
        # Choose action using epsilon greedy.
        current_q = self.q[state]
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions)
        else:
            action = self.argmax(current_q)
        
        # Perform an update
        self.q[self.prev_state][self.prev_action] += self.step_size*(reward + self.discount*np.max(self.q[state]) - self.q[self.prev_state][ self.prev_action])
        
        self.prev_state = state
        self.prev_action = action
        return action
    
    def agent_end(self, reward):
        """Run when the agent terminates.
        Args:
            reward (float): the reward the agent received for entering the
                terminal state.
        """
        # Perform the last update in the episode 

        self.q[self.prev_state][self.prev_action] += self.step_size*(reward - self.q[self.prev_state][self.prev_action])

        
    def argmax(self, q_values):
        """argmax with random tie-breaking
        Args:
            q_values (Numpy array): the array of action-values
        Returns:
            action (int): an action with the highest value
        """
        top = float("-inf")
        ties = []

        for i in range(q_values.shape[0]):
            if q_values[i] > top:
                top = q_values[i]
                ties = []

            if q_values[i] == top:
                ties.append(i)

        return self.rand_generator.choice(ties)
    
    def training(self, num_episodes,env):
        for i_episode in range(1, num_episodes+1):
            # monitor progress
            if i_episode % 1000 == 0:
                print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")

            obs = env.reset()
            obs = obs[0]

            action = self.agent_start(obs)
            
            done = False
            while not done:
                obs, reward, done, _, info = env.step(action)
                action = self.agent_step(reward,obs)
            
            self.agent_end(reward)


    def policy(self,obs):
        return self.argmax(self.q[obs])
    
    def get_policy(self):
        self.p = dict((k,self.argmax(v)) for k, v in self.q.items())

    def get_valuefunction(self):
        return dict((k,np.max(v)) for k, v in self.q.items())

# Sarsa Agent

## Implementation

In [None]:
class SarsaAgent():
    def __init__(self,env,epsilon,alpha,gamma,height = 15, width = 20):
        """Setup for the agent called when the experiment first starts.
        
        Args:
            epsilon (float): The epsilon parameter for exploration,
            step_size (float): The step-size,
            discount (float): The discount factor
        """
        # Store the parameters provided in agent_init_info.
        self.num_actions = 2
        self.num_states = height*width
        self.epsilon = epsilon
        self.step_size = alpha
        self.discount = gamma
        self.rand_generator = np.random.RandomState(12)
        
        # Create an array for action-value estimates and initialize it to zero.
        self.q = defaultdict(lambda: np.zeros(env.action_space.n))

        
    def agent_start(self, state):
        """The first method called when the episode starts, called after
        the environment starts.
        Args:
            state (int): the state from the
                environment's evn_start function.
        Returns:
            action (int): the first action the agent takes.
        """
        
        # Choose action using epsilon greedy.
        current_q = self.q[state]
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions) # random action selection
        else:
            action = self.argmax(current_q) # greedy action selection
        self.prev_state = state
        self.prev_action = action
        return action
    
    def agent_step(self, reward, state):
        """A step taken by the agent.
        Args:
            reward (float): the reward received for taking the last action taken
            state (int): the state from the
                environment's step based on where the agent ended up after the
                last step.
        Returns:
            action (int): the action the agent is taking.
        """
        
        # Choose action using epsilon greedy.
        current_q = self.q[state]
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions)
        else:
            action = self.argmax(current_q)
        
        # Perform an update
        self.q[self.prev_state][self.prev_action] += self.step_size*(reward + self.discount*self.q[state][action] - self.q[self.prev_state][ self.prev_action])
        
        self.prev_state = state
        self.prev_action = action
        return action
    
    def agent_end(self, reward):
        """Run when the agent terminates.
        Args:
            reward (float): the reward the agent received for entering the
                terminal state.
        """
        # Perform the last update in the episode 

        self.q[self.prev_state][self.prev_action] += self.step_size*(reward - self.q[self.prev_state][self.prev_action])

        
    def argmax(self, q_values):
        """argmax with random tie-breaking
        Args:
            q_values (Numpy array): the array of action-values
        Returns:
            action (int): an action with the highest value
        """
        top = float("-inf")
        ties = []

        for i in range(q_values.shape[0]):
            if q_values[i] > top:
                top = q_values[i]
                ties = []

            if q_values[i] == top:
                ties.append(i)

        return self.rand_generator.choice(ties)
    
    def training(self, num_episodes,env):
        for i_episode in range(1, num_episodes+1):
            # monitor progress
            if i_episode % 1000 == 0:
                print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")

            obs = env.reset()
            obs = obs[0]

            action = self.agent_start(obs)
            
            done = False
            while not done:
                obs, reward, done, _, info = env.step(action)
                action = self.agent_step(reward,obs)
            
            self.agent_end(reward)


    def policy(self,obs):
        return self.argmax(self.q[obs])
    
    def get_policy(self):
        self.p = dict((k,self.argmax(v)) for k, v in self.q.items())

    def get_valuefunction(self):
        return dict((k,np.max(v)) for k, v in self.q.items())

# Test agent

## initiate environment

In [None]:
env = gym.make('TextFlappyBird-v0', height = 15, width = 20, pipe_gap = 4)

## initiate agent

In [None]:
agent = QLearningAgent(env,epsilon = 0.1,alpha=0.1,gamma= 1.0,height = 15, width = 20)
agent.training(num_episodes=50000,env=env)

print("done training")

In [None]:
obs = env.reset()

obs = obs[0]

actions = []
observations = []
rewards = []
scores = [0]
# iterate
while scores[-1]<10000:

    # Select next action
    action = agent.policy(obs)#env.action_space.sample() #agent.policy(obs)  # ## for an agent, action = agent.policy(observation)
    actions.append(action)

    # Appy action and return new observation of the environment
    obs, reward, done, _, info = env.step(action)
    scores.append(info['score'])
    observations.append(obs)
    rewards.append(reward)

    # If player is dead break
    if done:
        break

env.close()

print("actions",actions)
print("obs",observations)
print("rewards",rewards)
print("scores",scores)