<a href="https://colab.research.google.com/github/stavco9/reinforcment-midterm/blob/main/FlappyBirdMidTerm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Installs

## Imports

In [1]:
import sys
import os
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
    # Run just in colab
    !git clone https://github.com/ntasfi/PyGame-Learning-Environment
    !pip install gym_ple
    !pip install -e ./PyGame-Learning-Environment
    !pip install pyvirtualdisplay
    !sudo apt-get install -y xvfb ffmpeg freeglut3-dev

Cloning into 'PyGame-Learning-Environment'...
remote: Enumerating objects: 1118, done.[K
remote: Total 1118 (delta 0), reused 0 (delta 0), pack-reused 1118 (from 1)[K
Receiving objects: 100% (1118/1118), 8.06 MiB | 22.54 MiB/s, done.
Resolving deltas: 100% (592/592), done.
Collecting gym_ple
  Downloading gym_ple-0.3.tar.gz (4.0 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: gym_ple
  Building wheel for gym_ple (setup.py) ... [?25l[?25hdone
  Created wheel for gym_ple: filename=gym_ple-0.3-py3-none-any.whl size=4458 sha256=301a67ffb7db569cdb3d9d32813882b95591ed2bf07085a28e977359977ebc5f
  Stored in directory: /root/.cache/pip/wheels/71/96/17/a59c56c130c89ad5816ef952c93cad65803807f1a3891ba994
Successfully built gym_ple
Installing collected packages: gym_ple
Successfully installed gym_ple-0.3
Obtaining file:///content/PyGame-Learning-Environment
  Preparing metadata (setup.py) ... [?25l[?25hdone
Installing collected packages: ple
  

In [2]:
if IN_COLAB:
    game_path = '/content'
else:
    game_path = '/Users/scohen6/projects/idc_msc/reinforcment-learning/reinforcment-midterm'

sys.path.append(os.path.join(game_path, 'PyGame-Learning-Environment'))
from ple.games.flappybird import FlappyBird
from ple import PLE
from gym import spaces
from statistics import mean
import IPython
import numpy as np
import gym
import os
import pyvirtualdisplay
import base64
import warnings
import imageio
from abc import ABC, abstractmethod
warnings.filterwarnings("ignore")


pygame 2.6.1 (SDL 2.28.4, Python 3.10.12)
Hello from the pygame community. https://www.pygame.org/contribute.html
couldn't import doomish
Couldn't import doom


In [3]:
import random
import json

## Utility function to display episode

In [9]:
def embed_mp4(filename):
  """Embeds an mp4 file in the notebook."""
  video = open(filename,'rb').read()
  b64 = base64.b64encode(video)
  tag = '''
  <video width="640" height="480" controls>
    <source src="data:video/mp4;base64,{0}" type="video/mp4">
  Your browser does not support the video tag.
  </video>'''.format(b64.decode())

  return IPython.display.HTML(tag)
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()

## Environment

### Environment Summary

This environment simulates the *Flappy Bird* game, where the agent controls the bird’s vertical position to navigate through gaps in pipes. Built on OpenAI’s `gym.Env` class, the environment allows interaction through standard functions like `step`, `reset`, and `render`.

**Key Components**

- **Observation Space**: The observation space includes 8 integer values, each representing a different aspect of the game state. These include:
  - `"player_y"`: The bird's vertical position.
  - `"player_vel"`: The bird's vertical velocity.
  - `"next_pipe_dist_to_player"`: Horizontal distance to the next pipe.
  - Additional features: Positions of the top and bottom of the next two pipes, enabling the agent to anticipate upcoming gaps.

- **Action Space**: This is a discrete space with two actions:
  - `0`: No flap (bird falls due to gravity).
  - `1`: Flap (bird moves upward).
- **Using the Environment**

  - **Step:**
    The `step` function returns four parameters: `obs`, `reward`, `done`, and `info`.

    - **`obs`**: The new observation after applying the action.
    - **`reward`**: The reward received for the action.
    - **`done`**: A boolean indicating whether the episode has ended.
    - **`info`**: A dictionary containing additional information. In this environment, the `info` dictionary includes a `"score"` entry that indicates how many pipes the bird has passed since the beginning of the episode.
  - **Reset**
    The `reset` function restores the environment to its initial state and returns the observation of that state.

  - **Render**
    The `render` function returns an image of the current state as a NumPy array (`np.array`).

  See the `Running the environment` section below for an example of how to use it.

- **Customizable Parameters**:
  - **`pipe_gap`**: This controls the vertical gap between pipes. The default is set to `80`, providing a moderate challenge.
  - **`custom_obs`** and **`custome_observation_map`**: Setting `custom_obs=True` instructs the environment to use a custom `observation_map` specified by the user, rather than the default map. When using a custom observation map, the user must also provide a `preprocess` function compatible with the custom observation map to ensure the observations are correctly processed for the agent.

  - **`preprocess`** and **`reward_shaping`**: Optional functions that allow the user to customize how observations and rewards are modified. The `preprocess` function transforms the raw observations according to the specified `observation_map`, while `reward_shaping` allows for custom adjustments to the reward structure, helping to align the environment with the agent’s learning goals.

  See the `Custom observation and reward shaping example of usage` section below for an example of usage.






In [4]:
# to disable the python game window popup
os.environ["SDL_VIDEODRIVER"] = "dummy"

OBSERVATION_MAP = {0: 'player_y',
                   1: "player_vel",
                   2: "next_pipe_dist_to_player",
                   3: "next_pipe_top_y",
                   4: "next_pipe_bottom_y",
                   5: "next_next_pipe_dist_to_player",
                   6: "next_next_pipe_top_y",
                   7: "next_next_pipe_bottom_y",
}

class Game(gym.Env):
    def __init__(self, display_screen=False,
                 force_fps=True,
                 custom_obs=False,
                 pipe_gap=80,
                 custome_observation_map={},
                 preprocess=lambda x: x,
                 reward_shaping=lambda x, y, z: x):

        os.environ["SDL_VIDEODRIVER"] = "dummy"
        game = FlappyBird(pipe_gap=pipe_gap)  # define and initiate the environment
        self.env = PLE(game, fps=30, display_screen=display_screen,
                       force_fps=force_fps)
        self.env.init()
        # list of actions in the environment
        self.actions = self.env.getActionSet()
        # length of actions
        self.action_space = spaces.Discrete(len(self.actions))
        self.custom_obs = custom_obs
        self._observation_map = custome_observation_map if custom_obs else OBSERVATION_MAP
        self.preprocess = preprocess
        self.reward_shaping = reward_shaping
        self.score = 0

    @property
    def observation_space(self):
        return spaces.Box(low=0, high=512, shape=(len(self._observation_map),), dtype=int)

    def _get_rgb(self):
        return self.env.getScreenRGB().transpose(1, 0, 2)

    @property
    def observation_map(self):
        return self._observation_map

    def step(self, action):
        """Take the action chosen and update the reward"""
        reward = self.env.act(self.actions[action])
        if reward < 0:
          reward = -1

        if reward > 0:
          self.score += 1
        state = self.env.getGameState()
        terminal = self.env.game_over()
        reward = self.reward_shaping(reward, terminal, env)
        info = {'score':self.score}
        return self.preprocess(np.array(list(state.values())).astype(int)), reward, terminal, info

    def getGameState(self):
        '''
        PLEenv return gamestate as a dictionary. Returns a modified form
        of the gamestate only with the required information to define the state
        '''
        state = self.env.getGameState()
        h_dist = state['next_pipe_dist_to_player']
        v_dist = state['next_pipe_bottom_y'] - state['player_y']
        vel = state['player_vel']

        return ' '.join([str(vel), str(h_dist), str(v_dist)])

    def reset(self):
        """Resets the game to start a new game"""
        self.env.reset_game()
        state = self.env.getGameState()
        self.score = 0
        return self.preprocess(np.array(list(state.values())).astype(int))

    def render(self, mode='human'):
        """Render the game"""
        return self._get_rgb()

    def seed(self, seed):
        rng = np.random.RandomState(seed)
        self.env.rng = rng
        self.env.game.rng = self.env.rng

        self.env.init()

In [5]:
env = Game(custom_obs=False)

### Observation space

In [6]:
print(f"observation space: {env.observation_space}")
for feaure, des in env.observation_map.items():
  print(f"feaure: {feaure} -> {des}")

observation space: Box(0, 512, (8,), int64)
feaure: 0 -> player_y
feaure: 1 -> player_vel
feaure: 2 -> next_pipe_dist_to_player
feaure: 3 -> next_pipe_top_y
feaure: 4 -> next_pipe_bottom_y
feaure: 5 -> next_next_pipe_dist_to_player
feaure: 6 -> next_next_pipe_top_y
feaure: 7 -> next_next_pipe_bottom_y


In this *Flappy Bird* environment, the `Box` type represents a `Discrete` observation space in OpenAI's Gym, providing the agent with a range of values in each dimension of the environment. Here’s what each part means:

* **Box**: This type of space is used to define a `Discrete`  range of values for each observation dimension. Here, it represents an 8-dimensional vector.
* **(0, 512)**: Each dimension within this space can take values from 0 to 512, which corresponds to the pixel boundaries of the game.
* **(8,)**: This shape indicates that there are 8 distinct variables in the observation vector, each capturing a critical feature of the environment.
* **int64**: Each value in the vector is an integer of 64-bit precision.

In summary, `Box(0, 512, (8,), int64)` represents an 8-dimensional vector space where each element is an integer from 0 to 512. Each dimension corresponds to a specific aspect of the *Flappy Bird* environment:

1. **Player Y Position**: The vertical position of the bird, representing its height on the screen.
2. **Player Velocity**: The bird’s current velocity, which impacts its movement and helps determine when to flap.
3. **Distance to Next Pipe**: The horizontal distance between the bird and the next pipe, crucial for timing flaps.
4. **Next Pipe Top Y Position**: The vertical position of the top of the next pipe, helping the bird gauge the gap.
5. **Next Pipe Bottom Y Position**: The vertical position of the bottom of the next pipe, providing further data for navigating the gap.
6. **Distance to Next-Next Pipe**: The horizontal distance to the pipe after the next one, giving the bird foresight into upcoming obstacles.
7. **Next-Next Pipe Top Y Position**: The vertical position of the top of the pipe after the next one, extending the bird's view of upcoming gaps.
8. **Next-Next Pipe Bottom Y Position**: The vertical position of the bottom of the pipe after the next one, for additional planning.

Together, these dimensions offer the agent a detailed snapshot of the environment, helping it make decisions based on its position, velocity, and the layout of both current and upcoming obstacles.


### Action space

In [7]:
env.action_space.seed(47457)

[47457]

In this *Flappy Bird* environment, the `Discrete(2)` action space defines the possible actions the agent can take, using a finite set of discrete values:

* **Discrete**: This type of space is used when there are a limited number of distinct actions.
* **2**: The number of actions available, represented by values 0 and 1.

For *Flappy Bird*, these actions correspond to:
  * **0**: Do nothing (the bird continues to fall due to gravity).
  * **1**: Flap (the bird moves upward briefly).

In summary, `Discrete(2)` defines an action space with two possible actions: allowing the bird to either continue falling or flap to move upward, which is essential for navigating the gaps between pipes.


### Rewards

*Reward Structure:*



*   **Positive Reward**: For each pipe successfully passed, the agent receives a reward of **+1**.
*   **Negative Reward**: Upon reaching any terminal state, the agent receives a penalty of **-1**.
*   **Zero Reward**: Any other senario the reward is **0**




### Running the environmen

In [10]:
env = Game()
env.seed(42)
obs = env.reset()
video_filename = os.path.join(game_path, 'vid.mp4')
max_steps = 100
# Evaluation
with imageio.get_writer(video_filename, fps=24) as video:
  obs = env.reset()
  done = False
  total_reward = 0
  for step in range(max_steps):
      action = env.action_space.sample()
      obs, reward, done, info = env.step(action)
      next_obs = obs  # Get agent's position directly from the environment
      total_reward += reward
      re = env.render()
      video.append_data(re)
      if done:
        score = info['score']
        print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
        break
embed_mp4(video_filename)

Sum of rewards = -1.0 , Number of steps= 48 score:  0


### Custom observation and reward shaping example of usage

In [11]:
def custom_observation():
    custom_obs_mapping = {0: "average",
                          1: 'sum'}

    # meaningless preprocess method that average and sums all of the original features
    preprocess = lambda x: np.array([np.mean(x), np.sum(x)])

    # meaningless reward shaping method that added 1 to each original reward
    reward_shaping = lambda x, y, z: x+1

    env = Game(custom_obs=True,
               custome_observation_map=custom_obs_mapping,
               preprocess=preprocess,
               reward_shaping=reward_shaping)
    env.seed(42)
    obs = env.reset()

    print(f"observation space: {env.observation_space}")
    for feaure, des in env.observation_map.items():
      print(f"feaure: {feaure} -> {des}")

    print(f"reset: {obs}")
    print("#"*100, end="\n\n")
    video_filename = os.path.join(game_path, 'vid.mp4')
    max_steps = 100
    # Evaluation
    with imageio.get_writer(video_filename, fps=24) as video:
      obs = env.reset()
      done = False
      total_reward = 0
      for step in range(max_steps):
          action = env.action_space.sample()
          obs, reward, done, info = env.step(action)
          next_obs = obs  # Get agent's position directly from the environment
          total_reward += reward
          re = env.render()
          video.append_data(re)
          if done:
            score = info['score']
            print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
            break
    embed_mp4(video_filename)

# Your Work begins here

## Preprocess

**Define the `preprocess` Method**

- Create a function called `preprocess` that takes an observation as input.
- The purpose of this function is to transform the raw observation into a format that optimizes learning for the agent. Use your creativity to design an effective preprocessing strategy for this task.

**Suggested Approaches:**
- **Normalization or Scaling**: Adjust observation values to a common range, such as $0$ to $1$ or $0$ to $N$, to improve stability in the learning process.
- **Feature Extraction**: Emphasize key features in the observation that are crucial for achieving the environment's goals.
- **Dimensionality Reduction**: Simplify the observation by removing or combining less significant parts to streamline learning and reduce complexity.

**Compression and Q-Table Size**
- Reducing the complexity of the observation, through techniques like scaling, normalization, or removing redundant features, effectively compresses the data. This compression simplifies the agent’s perception of the environment, helping it focus on the most relevant information.
- In discrete environments, reducing observation dimensionality directly impacts the size of the Q-table. By compressing the observation space, you decrease the number of possible states, which makes the Q-table smaller and easier to manage. This reduction in Q-table size not only conserves memory but can also enhance learning efficiency.
  
Use these ideas to explain how your preprocessing strategy compresses the observation space and helps the agent handle the environment more effectively.

**Rationalize Your Preprocessing Choices**

- Include a brief explanation of your preprocessing choices. Explain why you believe this approach will improve the agent's learning efficiency and performance.
- Discuss how these adjustments make the observations more interpretable or manageable for the learning algorithm.

**Important**
- The maximum value on the **Vertical** axis is **512**.
- The maximum value on the **Horizontal** axis is **288**.


In [12]:
custome_observation_map = {} # assing each action with a description of the feature meaning
def preprocess(obs):

  def extract_features(obs):
    # Basic feature extraction
    gap_mid_point = (obs[3] + obs[4]) / 2
    return {"gap_mid_point": gap_mid_point,
            "above_middle": gap_mid_point - obs[0],
            }

  # Extract features
  features = extract_features(obs)
  processed_obs = int(bool(features["above_middle"] > -6))

  # Print feature details
  # for feaure, des in env.observation_map.items():
  #   print(f"feaure: {feaure} -> {des}")

  # Using a single boolean feature!
  '''
  This can be expanded, but we've already proven that choosing an action
  deterministically based on this feature is enough to pass 10+ pipes with
  default distance between them. Perhaps it's not enough to pass a harder game,
  where the pipes are closer
  '''
  return processed_obs

def preprocess_2(obs):

    processed_obs = {}

    vel = obs[1]
    x0 = obs[2]
    x1 = obs[5]
    y0 = int((int(obs[3]) + int(obs[4])) / 2) - obs[0]
    #y1 = int((int(obs[6]) + int(obs[7])) / 2) - obs[0]
    if 0 <= x1 < 50 and x0 < x1:
        y1 = int((int(obs[6]) + int(obs[7])) / 2) - obs[0]
    else:
        y1 = 0

    processed_obs["x0"] = int(bool(x0 < 40))
    processed_obs["y0"] = int(bool(y0 > -4))
    processed_obs["vel"] = int(bool(vel >= 0))
    processed_obs["x1"] = int(bool(x1 < 60) and x0 < x1)
    #processed_obs["y1"] = int(bool(y1 > -5))
    processed_obs["y1"] = int(bool(y1 > -6 and y1 < y0))

    return processed_obs


# Current observation space dimensionality:
  # 8 features, each with 500 options.
  # That's 500^8 possible states.
  # Quite large.
  # Probability of being in the same state twice is low
  # Q-learning is on-policy, therefore we learn only about states we already met
  # So we can't afford ourself to continue working in such a high dimension.

# Potential preprocessing improvements
    # If the width of the gap is constant - can collapse the 'top' and 'bottom' location of next gap to a single variable
    # Same applies to the next-next gap.
    # Two suggestions above can reduce size of observation vector from 8 to 6
    # No need to remember the vertical location of self, next, and next-next gap
      # We can store the information in two variables: (self_height - next_height) and (self_height - next_next_height)
      # We don't care for the absolute location of all three objects, only to the relationship between them
      # This is if we ignore options of hitting the floor or ceiling.
      # It makes sense to ignore them, since the gaps are usually not too close to them to be relevant
      # And we expect our height will be somewhere around the next gap
      # This can further reduce another variable, so the obs space is 5-dimentional
    # Reduce granularity: we don't need a range of 0-500 in our variables
      # Only close things matter. I don't really care if the next gap is 300 above me or 200. In any way I must go up.
      # We can 'trim' the values so that anything above 100 for e.g. if set to 100
      # This will allow us to represent each value of obs with 7 bits
      # So with previous suggestions this means we represent an obs space with 5x7 = 35 bits
    # Consider transforming numerical values to booleans
      # Bird's vertical velocity can be reduced to a boolean - going up or down
      # The location of the next-next gap can be reduced to a bool - is the gap above or below the next gap?
      # These two options together reduce 2 of the 5 variables to a single bit
      # So we can represent the state with 3x7 + 2 = 23 bits
    # Consider quantizing some of the values.
      # e.g. the distance to the next gap can be be presented as the (dist // 5) for e.g.
      # Same can be done with difference of height between bird and next gap
    # Consider removing information about next-next gap. Maybe we can succeed without it altogether

## Reward shaping

### Define a Reward Shaping Strategy

**Reward shaping** means adjusting the rewards given to the agent to encourage desired actions and help it learn faster. In *Flappy Bird*, the goal is to create rewards that guide the bird to survive longer and avoid hitting pipes.

**Steps for Reward Shaping:**

1. **Identify Desired Behaviors**:
   - Decide which behaviors you want to encourage. For example, flapping at the right time to pass through pipes or avoiding unnecessary flaps could be rewarded.

2. **Design Rewards**:
   - Use rewards that motivate the bird to take helpful actions without distracting from the main goal (staying alive). Here are some ideas:
     - **Survival Reward**: Give a small reward for each step the bird survives.
     - **Penalty for Hitting Pipes**: Apply a negative reward (penalty) when the bird crashes into a pipe.
    - **Passing Through Pipes**: Provide a positive reward each time the bird successfully passes through a pipe.


3. **Explain Your Reward Choices**:
   - Add an explanation for your reward design. Why did you decide to reward certain actions or apply penalties?
   - Describe how your rewards help the bird focus on staying alive and navigating pipes effectively.
   - Provide an example demonstrating the impact of your reward shaping logic, ideally by comparing performance graphs with and without reward shaping.

4. **Avoid Over-Shaping**:
   - Avoid giving too many rewards for minor actions, as it could lead the bird to focus on short-term rewards instead of the main goal. Aim for a balance where the bird gets enough guidance without relying on intermediate rewards.

**Q-Learning and Value Convergence**:
   - Remember, shaping rewards impacts **Q-values** (the expected value of actions). When rewards are well-designed, the bird’s Q-values will better reflect valuable actions, helping it make smarter choices over time.

Use these tips to develop a reward strategy that encourages the bird to navigate through pipes effectively while improving its learning.


In [13]:
def reward_shaping(reward, terminal, env):
  # Added a third argument, `env`, to the function supplied in the exercise

  SURVIVAL_REWARD = 1
  TERMINAL_PUNISHMENT = 50  # This sounds so evil

  # Reward surviving another stp
  reward += SURVIVAL_REWARD

  # Other optional rewards/punishments
    # Reward moving towards the middle of the gap
    # Punish being beyond the bounderies of the upcoming gap
    # Small punishment for not being in the gap of next-next gap

  # Punish terminal states (user lost)
  if terminal:
    reward -= TERMINAL_PUNISHMENT

  return reward

## Implementation of Learning Agents

Abstract Class for Implementing an On-Policy Agent

Below is an abstract class to serve as a foundation for implementing an on-policy agent. Derive your agent from this class and implement the specific logic for your agent. Note that different agents may require additional parameters or functions to support their unique features and learning mechanisms.


In [14]:
from abc import ABC, abstractmethod

class OnPolicyAgent(ABC):
    def __init__(self, action_space, observation_space, gamma, learning_rate, epsilon=None):
        """
        Initializes the on-policy RL agent.
        You can add more parameters
        Parameters:
        - action_space: The action space of the environment
        - observation_space: The observation space of the environment
        - gamma: Discount factor for future rewards
        - learning_rate: Learning rate for policy updates
        """
        self.action_space = action_space
        self.observation_space = observation_space
        self.gamma = gamma
        self.learning_rate = learning_rate
        self.epsilon = epsilon
        self.policy = None  # Placeholder for policy (to be implemented in subclasses)

    @abstractmethod
    def select_action(self, observation, deterministic=False):
        """
        Select an action based on the current policy.

        Parameters:
        - observation: Current state observation from the environment
        -deterministic: Flag indicating whether to use a deterministic policy (default is False)
        Returns:
        - action: Chosen action based on the policy
        """
        pass

    @abstractmethod
    def update_policy(self, transition):
        """
        Update the policy based on the current transition.

        Parameters:
        - transition: Data collected from interactions with the environment (tuple of zero or more values such as acion, reward etc...)
        """
        pass

    @abstractmethod
    def train(self, env, num_episodes, log_every):
        """
        Train the agent over a specified number of episodes.

        Parameters:
        - env: The environment to train in
        - num_episodes: Number of training episodes
        """
        pass

    def save_policy(self, file_path):
        """
        Save the current policy to a file.

        Parameters:
        - file_path: Path to the file where the policy should be saved
        """
        print(f"Saving policy to {file_path}: {json.dumps(self.policy, indent=4)}")
        with open(file_path, 'w') as f:
            json.dump(self.policy, f)

    def load_policy(self, file_path):
        """
        Load a policy from a file.

        Parameters:
        - file_path: Path to the file from which the policy should be loaded
        """
        print(f"Loading policy from {file_path}")
        with open(file_path, 'r') as f:
            policy = json.load(f)

        typeof_dict = type(list(policy.values())[0]).__name__
        if typeof_dict == "dict":
            self.policy = {
                int(state_key): {
                    int(action): value
                    for action, value in actions_dict.items()
                }
                for state_key, actions_dict in policy.items()
            }
        else:
            self.policy = {
                state_key: actions_list
                for state_key, actions_list in policy.items()
            }

        print(f"Loaded policy: {json.dumps(self.policy, indent=4)}")

    @abstractmethod
    def run_policy(self, env):
        """
        Run the loaded policy in the environment for one episode.

        Parameters:
        - env: The environment in which to run the policy

        Returns:
        - total_reward: Total reward accumulated in the episode
        """
        pass


## Training

#### Experimentation Instructions

In this section, you will conduct a series of experiments to solve the environment using your on-policy agent. Follow these guidelines for a clear and comprehensive presentation of your work:

1. **Design and Run Experiments**:
   - Perform at least **5 distinct experiments** with your agent, including:
     - **2 unsuccessful experiments** where the agent fails to learn the environment (reaching a score below the target).
     - **1 successful experiment** where the agent achieves a score of **10** or higher. (Remember the score is define by the amount of pipes the agent has passed)

   - *Note*: In practice, you are expected to try many configurations to observe various outcomes, even beyond these 3 experiments.

2. **Provide Detailed Summaries**:
   - For each experiment, include:
     - **Graphs** that illustrate the training process, such as reward progression, steps, score, and  any other relevant metrics (there is many more metrics that we can learn from).
     - **Explanations of Hyperparameters**: Clearly describe the configuration of hyperparameters chosen for each experiment, and the reasoning behind these choices.
     - **Preprocessing and Reward Shaping**: Explain any data preprocessing or reward shaping techniques applied, along with the motivation for these techniques.

3. **Analyze and Reflect**:
   - **Result Analysis**: Analyze each experiment's outcome, focusing on why the agent may have succeeded or failed based on your configurations.
   - **Insights and Deductions**: Draw insights from each experiment, noting what worked and what didn’t. Use these insights to refine your understanding of the environment and agent performance.

4. **Final Report**:
   - **Synthesize Findings**: Compile all observations, insights, and analysis from your experiments into a final report. This report should showcase your learnings and demonstrate a deep understanding of the experimentation process.
   - **Emphasis on Analysis**: The analysis of your results and the conclusions drawn are the most critical aspects of this exercise. Dedicate ample attention to this section, as it will reflect your ability to interpret and understand the agent’s learning process.

**Remember**: The goal of this exercise is to experiment, learn, and iterate. Document your process thoroughly, as this will form the foundation of your report.


In [None]:
class TrivialPolicy(OnPolicyAgent):
    def __init__(self, action_space, observation_space, gamma, learning_rate):
        """
        Initializes the on-policy RL agent.
        You can add more parameters
        Parameters:
        - action_space: The action space of the environment
        - observation_space: The observation space of the environment
        - gamma: Discount factor for future rewards
        - learning_rate: Learning rate for policy updates
        """
        # Note: hyperparameters aren't being used yet
        super().__init__(action_space, observation_space, gamma, learning_rate)
        print('Initializing with action space:', action_space)
        print('Initializing with observation space:', observation_space)

    def select_action(self, observation, deterministic=False):
        """
        Select an action based on the current policy.

        Parameters:
        - observation: Current state observation from the environment
        -deterministic: Flag indicating whether to use a deterministic policy (default is False)
        Returns:
        - action: Chosen action based on the policy
        """

        # NOTE: 0 is going UP and 1 is going DOWN, unlike instructions.

        EPSILON = 0  # Not the right place to put this
        import random  # Wrong place for this. We have another import of rand
        explore = random.random() < EPSILON

        if explore:
          print(f"Going by the book is boring. Let's explore")
          random_action = random.randint(0, len(self.action_space) - 1)
          return random_action

        state_actions = self.policy[observation]
        best_action = max(state_actions, key=state_actions.get)
        return best_action

        '''
        Currently our preprocessing extract a strong boolean feature
        This feature alone allows us to deterministically choose an action
        We can 'improve' this by using a simple, 2x2 Q-table
        '''
        return observation

    def update_policy(self, transition):
        """
        Update the policy based on the current transition.

        Parameters:
        - transition: Data collected from interactions with the environment (tuple of zero or more values such as acion, reward etc...)
        """
        obs, selected_action, new_obs, reward, done = transition

        # This is NOT q-learning.
        # This is just a naive attempt to create _some_ kind of learning
        # print(f"Changing reward for {selected_action} in state {obs} by {reward * self.learning_rate}")
        self.policy[obs][selected_action] += reward * self.learning_rate

    def train(self, env, num_episodes, log_every):
        """
        Train the agent over a specified number of episodes.

        Parameters:
        - env: The environment to train in
        - num_episodes: Number of training episodes

        Here we need to invoke (in a loop)
          * `update_policy`
        """

        # Initialize a random policy
        # Consider other formats for efficiency, e.g. ndarray or pandas df
        policy = {
            state: {
                action: round(random.random(), 4)
                for action in action_space
            }
            for state in observation_space
        }

        # Display initial policy
        print(f"Initialized a random policy: {json.dumps(policy, indent=4)}")
        self.policy = policy

        # Setting up video capturing
        # Currently same video for multiple episodes
        video_filename = os.path.join(game_path ,f'{type(self).__name__.lower()}_train.mp4')
        with imageio.get_writer(video_filename, fps=24) as video:

          max_steps = 500
          for episode in range(num_episodes):
            # Resetting environment and getting first observation
            print(f"Starting episode #{episode}")
            obs = env.reset()  # obs is now the initial state
            done = False
            total_reward = 0

            # Training loop
            for step in range(max_steps):

              # Log
              if step % log_every == 0:
                print(f"Step #{step}")

              # Pick an action
              selected_action = self.select_action(obs)

              # Update reward
              new_obs, reward, done, info = env.step(selected_action)

              # Update Policy
              self.update_policy(
                  (obs, selected_action, new_obs, reward, done)
                  )

              # Update the environment after last step
              total_reward += reward
              obs = new_obs

              # Update video
              re = env.render()
              video.append_data(re)

              # Terminate training loop when encountring a terminal state
              if done:
                score = info['score']
                print("Encountered a terminal state")
                print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
                break

            print(f"Completed episode #{episode}.")
            print(f"Updated policy: {json.dumps(self.policy, indent=4)}")

        return video_filename

    def run_policy(self, env):
        """
        Run the loaded policy in the environment for one episode.

        Parameters:
        - env: The environment in which to run the policy

        Returns:
        - total_reward: Total reward accumulated in the episode
        """
        print(f"run_policy invoked")
        obs = env.reset()  # obs is now the initial state
        done = False
        total_reward = 0

        # Single episode, like method docstring states
        max_steps = 10_000
        # This should be extracted to a helper function and used in training also
        # Need to add a flag for 'train' to allow updating policy
        video_filename = os.path.join(game_path ,f'{type(self).__name__.lower()}_eval.mp4')
        with imageio.get_writer(video_filename, fps=24) as video:
          for step in range(max_steps):

            # Select action
            selected_action = self.select_action(obs)

            # Perform action, get reward and new state
            new_obs, reward, done, info = env.step(selected_action)

            # Update the environment after last step
            total_reward += reward
            obs = new_obs

            # Update video
            re = env.render()
            video.append_data(re)

            # Terminate training loop when encountring a terminal state
            if done:
              score = info['score']
              print("Encountered a terminal state")
              print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
              break

        return video_filename

In [None]:
class QLearningPolicy(OnPolicyAgent):
    def __init__(self, action_space, observation_space, gamma, learning_rate):
        """
        Initializes the on-policy RL agent.
        You can add more parameters
        Parameters:
        - action_space: The action space of the environment
        - observation_space: The observation space of the environment
        - gamma: Discount factor for future rewards
        - learning_rate: Learning rate for policy updates
        """
        # Note: hyperparameters aren't being used yet
        super().__init__(action_space, observation_space, gamma, learning_rate)
        print('Initializing with action space:', action_space)
        print('Initializing with observation space:', observation_space)

    def select_action(self, observation, deterministic=False):
        """
        Select an action based on the current policy.

        Parameters:
        - observation: Current state observation from the environment
        -deterministic: Flag indicating whether to use a deterministic policy (default is False)
        Returns:
        - action: Chosen action based on the policy
        """

        # NOTE: 0 is going UP and 1 is going DOWN, unlike instructions.

        EPSILON = 0  # Not the right place to put this
        import random  # Wrong place for this. We have another import of rand
        explore = random.random() < EPSILON

        if explore:
          print(f"Going by the book is boring. Let's explore")
          random_action = random.randint(0, len(self.action_space) - 1)
          return random_action

        state_actions = self.policy[observation]
        best_action = max(state_actions, key=state_actions.get)
        return best_action

        '''
        Currently our preprocessing extract a strong boolean feature
        This feature alone allows us to deterministically choose an action
        We can 'improve' this by using a simple, 2x2 Q-table
        '''
        return observation

    def update_policy(self, transition):
        """
        Update the policy based on the current transition.

        Parameters:
        - transition: Data collected from interactions with the environment (tuple of zero or more values such as acion, reward etc...)
        """
        obs, selected_action, new_obs, reward, done = transition

        current_state_part = (1 - self.learning_rate) * (self.policy[obs][selected_action])
        new_state_part = self.learning_rate * (reward + self.gamma * max(self.policy[obs].values()))
        self.policy[obs][selected_action] = current_state_part + new_state_part

    def train(self, env, num_episodes, log_every):
        """
        Train the agent over a specified number of episodes.

        Parameters:
        - env: The environment to train in
        - num_episodes: Number of training episodes

        Here we need to invoke (in a loop)
          * `update_policy`
        """

        # Initialize a random policy
        # Consider other formats for efficiency, e.g. ndarray or pandas df
        policy = {
            state: {
                action: round(random.random(), 4)
                for action in action_space
            }
            for state in observation_space
        }

        # Display initial policy
        print(f"Initialized a random policy: {json.dumps(policy, indent=4)}")
        self.policy = policy

        # Setting up video capturing
        # Currently same video for multiple episodes
        video_filename = os.path.join(game_path ,f'{type(self).__name__.lower()}_train.mp4')
        with imageio.get_writer(video_filename, fps=24) as video:

          max_steps = 500
          for episode in range(num_episodes):
            # Resetting environment and getting first observation
            print(f"Starting episode #{episode}")
            obs = env.reset()  # obs is now the initial state
            done = False
            total_reward = 0

            # Training loop
            for step in range(max_steps):

              # Log
              if step % log_every == 0:
                print(f"Step #{step}")

              # Pick an action
              selected_action = self.select_action(obs)

              # Update reward
              new_obs, reward, done, info = env.step(selected_action)

              # Update Policy
              self.update_policy(
                  (obs, selected_action, new_obs, reward, done)
                  )

              # Update the environment after last step
              total_reward += reward
              obs = new_obs

              # Update video
              re = env.render()
              video.append_data(re)

              # Terminate training loop when encountring a terminal state
              if done:
                score = info['score']
                print("Encountered a terminal state")
                print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
                break

            print(f"Completed episode #{episode}.")
            print(f"Updated policy: {json.dumps(self.policy, indent=4)}")

        return video_filename

    def run_policy(self, env):
        """
        Run the loaded policy in the environment for one episode.

        Parameters:
        - env: The environment in which to run the policy

        Returns:
        - total_reward: Total reward accumulated in the episode
        """
        print(f"run_policy invoked")
        obs = env.reset()  # obs is now the initial state
        done = False
        total_reward = 0

        # Single episode, like method docstring states
        max_steps = 10_000
        # This should be extracted to a helper function and used in training also
        # Need to add a flag for 'train' to allow updating policy
        video_filename = os.path.join(game_path ,f'{type(self).__name__.lower()}_eval.mp4')
        with imageio.get_writer(video_filename, fps=24) as video:
          for step in range(max_steps):

            # Select action
            selected_action = self.select_action(obs)

            # Perform action, get reward and new state
            new_obs, reward, done, info = env.step(selected_action)

            # Update the environment after last step
            total_reward += reward
            obs = new_obs

            # Update video
            re = env.render()
            video.append_data(re)

            # Terminate training loop when encountring a terminal state
            if done:
              score = info['score']
              print("Encountered a terminal state")
              print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
              break

        return video_filename

In [15]:
class QLearningPolicy2(OnPolicyAgent):
    def __init__(self, action_space, observation_space, gamma, learning_rate, epsilon):
        """
        Initializes the on-policy RL agent.
        You can add more parameters
        Parameters:
        - action_space: The action space of the environment
        - observation_space: The observation space of the environment
        - gamma: Discount factor for future rewards
        - learning_rate: Learning rate for policy updates
        """
        # Note: hyperparameters aren't being used yet
        super().__init__(action_space, observation_space, gamma, learning_rate, None)
        print('Initializing with action space:', action_space)
        print('Initializing with observation space:', observation_space)

    def get_state_key(self, observation):
        state_key = str(int(observation['x0'])) + "_" + str(int(observation['y0'])) + "_" + str(int(observation['vel'])) + "_" + str(int(observation['x1'])) + "_" + str(int(observation['y1']))

        if self.policy.get(state_key) is None:
            self.policy[state_key] = [0, 0] #[round(random.random(), 4), round(random.random(), 4)]

        return state_key

    def select_action(self, observation, deterministic=False):
        """
        Select an action based on the current policy.

        Parameters:
        - observation: Current state observation from the environment
        -deterministic: Flag indicating whether to use a deterministic policy (default is False)
        Returns:
        - action: Chosen action based on the policy
        """

        # NOTE: 0 is going UP and 1 is going DOWN, unlike instructions.

        EPSILON = 0  # Not the right place to put this
        import random  # Wrong place for this. We have another import of rand
        explore = random.random() < EPSILON

        if explore:
          print(f"Going by the book is boring. Let's explore")
          random_action = random.randint(0, len(self.action_space) - 1)
          return random_action

        new_state = self.get_state_key(observation)

        return 1 if self.policy[new_state][0] < self.policy[new_state][1] else 0

    def update_policy(self, transition):
        """
        Update the policy based on the current transition.

        Parameters:
        - transition: Data collected from interactions with the environment (tuple of zero or more values such as acion, reward etc...)
        """
        obs, selected_action, new_obs, reward, done = transition

        print(obs)

        current_state_part = (1 - self.learning_rate) * (self.policy[self.get_state_key(obs)][selected_action])
        new_state_part = self.learning_rate * (reward + self.gamma * max(self.policy[self.get_state_key(new_obs)][0:2]))
        self.policy[self.get_state_key(obs)][selected_action] = current_state_part + new_state_part

    def train(self, env, num_episodes, log_every):
        """
        Train the agent over a specified number of episodes.

        Parameters:
        - env: The environment to train in
        - num_episodes: Number of training episodes

        Here we need to invoke (in a loop)
          * `update_policy`
        """

        # Initialize a random policy
        # Consider other formats for efficiency, e.g. ndarray or pandas df
        self.policy = {}
        self.policy["0_0_0_0_0"] = [0, 0]#[round(random.random(), 4), round(random.random(), 4)]

        # Display initial policy
        #print(f"Initialized a random policy: {json.dumps(policy, indent=4)}")
        #self.policy = policy

        # Setting up video capturing
        # Currently same video for multiple episodes
        video_filename = os.path.join(game_path ,f'{type(self).__name__.lower()}_train.mp4')
        with imageio.get_writer(video_filename, fps=24) as video:

          max_steps = 500
          for episode in range(num_episodes):
            # Resetting environment and getting first observation
            print(f"Starting episode #{episode}")
            obs = env.reset()  # obs is now the initial state
            done = False
            total_reward = 0

            # Training loop
            for step in range(max_steps):

              # Log
              if step % log_every == 0:
                print(f"Step #{step}")
                str_print = ""
                for k, v in obs.items():
                    str_print += f"{k} is {v},"
                print(str_print)

              # Pick an action
              selected_action = self.select_action(obs)

              # Update reward
              new_obs, reward, done, info = env.step(selected_action)

              # Update Policy
              self.update_policy(
                  (obs, selected_action, new_obs, reward, done)
                  )

              # Update the environment after last step
              total_reward += reward
              obs = new_obs

              # Update video
              re = env.render()
              video.append_data(re)

              # Terminate training loop when encountring a terminal state
              if done:
                score = info['score']
                print("Encountered a terminal state")
                print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
                break

            print(f"Completed episode #{episode}.")
            print(f"Updated policy: {json.dumps(self.policy, indent=4)}")

        return video_filename

    def run_policy(self, env):
        """
        Run the loaded policy in the environment for one episode.

        Parameters:
        - env: The environment in which to run the policy

        Returns:
        - total_reward: Total reward accumulated in the episode
        """
        print(f"run_policy invoked")
        obs = env.reset()  # obs is now the initial state
        done = False
        total_reward = 0

        # Single episode, like method docstring states
        max_steps = 10_000
        # This should be extracted to a helper function and used in training also
        # Need to add a flag for 'train' to allow updating policy
        video_filename = os.path.join(game_path ,f'{type(self).__name__.lower()}_eval.mp4')
        with imageio.get_writer(video_filename, fps=24) as video:
          for step in range(max_steps):

            # Select action
            selected_action = self.select_action(obs)

            # Perform action, get reward and new state
            new_obs, reward, done, info = env.step(selected_action)

            # Update the environment after last step
            total_reward += reward
            obs = new_obs

            # Update video
            re = env.render()
            video.append_data(re)

            # Terminate training loop when encountring a terminal state
            if done:
              score = info['score']
              print("Encountered a terminal state")
              print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
              break

        return video_filename

In [16]:
class SarsaPolicy2(OnPolicyAgent):
    def __init__(self, action_space, observation_space, gamma, learning_rate, epsilon):
        """
        Initializes the on-policy RL agent.
        You can add more parameters
        Parameters:
        - action_space: The action space of the environment
        - observation_space: The observation space of the environment
        - gamma: Discount factor for future rewards
        - learning_rate: Learning rate for policy updates
        """
        # Note: hyperparameters aren't being used yet
        super().__init__(action_space, observation_space, gamma, learning_rate, epsilon)
        print('Initializing with action space:', action_space)
        print('Initializing with observation space:', observation_space)

    def get_state_key(self, observation):
        state_key = str(int(observation['x0'])) + "_" + str(int(observation['y0'])) + "_" + str(int(observation['vel'])) + "_" + str(int(observation['x1'])) + "_" + str(int(observation['y1']))

        if self.policy.get(state_key) is None:
            self.policy[state_key] = [0, 0]#[round(random.random(), 4), round(random.random(), 4)]

        return state_key

    def select_action(self, observation, deterministic=False):
        """
        Select an action based on the current policy.

        Parameters:
        - observation: Current state observation from the environment
        -deterministic: Flag indicating whether to use a deterministic policy (default is False)
        Returns:
        - action: Chosen action based on the policy
        """

        # NOTE: 0 is going UP and 1 is going DOWN, unlike instructions.

        EPSILON = 0  # Not the right place to put this
        import random  # Wrong place for this. We have another import of rand
        explore = random.random() < EPSILON

        if explore:
          print(f"Going by the book is boring. Let's explore")
          random_action = random.randint(0, 1)
          return random_action

        new_state = self.get_state_key(observation)

        return 1 if self.policy[new_state][0] < self.policy[new_state][1] else 0

    def update_policy(self, transition):
        """
        Update the policy based on the current transition.

        Parameters:
        - transition: Data collected from interactions with the environment (tuple of zero or more values such as acion, reward etc...)
        """
        obs, selected_action, new_obs, reward, done = transition

        print(obs)

        current_state_part = (1 - self.learning_rate) * (self.policy[self.get_state_key(obs)][selected_action])
        new_state_part = self.learning_rate * \
            (reward + self.gamma * (self.epsilon * mean(self.policy[self.get_state_key(new_obs)][0:2]) + \
             (1 - self.epsilon) * max(self.policy[self.get_state_key(new_obs)][0:2])))
        self.policy[self.get_state_key(obs)][selected_action] = current_state_part + new_state_part

    def train(self, env, num_episodes, log_every):
        """
        Train the agent over a specified number of episodes.

        Parameters:
        - env: The environment to train in
        - num_episodes: Number of training episodes

        Here we need to invoke (in a loop)
          * `update_policy`
        """

        # Initialize a random policy
        # Consider other formats for efficiency, e.g. ndarray or pandas df
        self.policy = {}
        self.policy["0_0_0_0_0"] = [0, 0] #[round(random.random(), 4), round(random.random(), 4)]

        # Display initial policy
        #print(f"Initialized a random policy: {json.dumps(policy, indent=4)}")
        #self.policy = policy

        # Setting up video capturing
        # Currently same video for multiple episodes
        video_filename = os.path.join(game_path ,f'{type(self).__name__.lower()}_train.mp4')
        with imageio.get_writer(video_filename, fps=24) as video:

          max_steps = 2000
          for episode in range(num_episodes):
            # Resetting environment and getting first observation
            print(f"Starting episode #{episode}")
            obs = env.reset()  # obs is now the initial state
            done = False
            total_reward = 0

            # Training loop
            for step in range(max_steps):

              # Log
              if step % log_every == 0:
                print(f"Step #{step}")
                str_print = ""
                for k, v in obs.items():
                    str_print += f"{k} is {v},"
                print(str_print)


              # Pick an action
              selected_action = self.select_action(obs)

              # Update reward
              new_obs, reward, done, info = env.step(selected_action)

              # Update Policy
              self.update_policy(
                  (obs, selected_action, new_obs, reward, done)
                  )

              # Update the environment after last step
              total_reward += reward
              obs = new_obs

              # Update video
              re = env.render()
              video.append_data(re)

              # Terminate training loop when encountring a terminal state
              if done:
                score = info['score']
                print("Encountered a terminal state")
                print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
                break

            print(f"Completed episode #{episode}.")
            print(f"Updated policy: {json.dumps(self.policy, indent=4)}")

        return video_filename

    def run_policy(self, env):
        """
        Run the loaded policy in the environment for one episode.

        Parameters:
        - env: The environment in which to run the policy

        Returns:
        - total_reward: Total reward accumulated in the episode
        """
        print(f"run_policy invoked")
        obs = env.reset()  # obs is now the initial state
        done = False
        total_reward = 0

        # Single episode, like method docstring states
        max_steps = 10_000
        # This should be extracted to a helper function and used in training also
        # Need to add a flag for 'train' to allow updating policy
        video_filename = os.path.join(game_path ,f'{type(self).__name__.lower()}_eval.mp4')
        with imageio.get_writer(video_filename, fps=24) as video:
          for step in range(max_steps):

            # Select action
            selected_action = self.select_action(obs)

            # Perform action, get reward and new state
            new_obs, reward, done, info = env.step(selected_action)

            # Update the environment after last step
            total_reward += reward
            obs = new_obs

            # Update video
            re = env.render()
            video.append_data(re)

            # Terminate training loop when encountring a terminal state
            if done:
              score = info['score']
              print("Encountered a terminal state")
              print("Sum of rewards =", total_reward, ", Number of steps=", step, "score: ", score)
              break

        return video_filename

In [None]:
env.observation_space.shape

(0,)

In [17]:
# Prepare environment
# With pipe gap of 65 it looks so bad
env = Game(custom_obs=True, preprocess=preprocess, pipe_gap=80, reward_shaping=reward_shaping, custome_observation_map=custome_observation_map)
env.seed(42)

env_2 = Game(custom_obs=True, preprocess=preprocess_2, pipe_gap=65, reward_shaping=reward_shaping, custome_observation_map=custome_observation_map)
env_2.seed(42)

# Policy dictionary.
# Consider making a dataclass or something similar
# We'll also save a score for each policy so we need to save it somehow
# So a more elaborate data structure could be useful. Namedtuple?
policy_dict = {
  #TrivialPolicy: 'trivial_policy.json',
  #QLearningPolicy: 'qlearning_policy.json',
  QLearningPolicy2: 'qlearning_policy2.json',
  SarsaPolicy2: 'sarsa_policy2.json'
}

train_videos = {}

# Set experiment
episodes = 20
log_every = 100  # For now log each step

# For now - common hyperparameter values for q-learning as default
# These aren't being used yet.
hyperparameters = {
    'gamma': 0.9,
    'learning_rate': 0.1,
    'epsilon': 0.3
}

# Action space
action_space = env.env.getActionSet()  # Not sure this makes sense. Weired values (None, 119).
action_space = [0, 1]  # Not sure this makes sense either. 0: fall, 1: fly.

# Observation space
# If we change the preprocessing function - this should also be updated
observation_space = [0, 1]

# Training all policies
# Currently, use only a single policy.
# We might want to break this loop and treat each policy separately
for policy, policy_file in policy_dict.items():
    policy = policy(
        env.action_space if "2" not in type(policy).__name__ else env_2.action_space,
        env.observation_space if "2" not in type(policy).__name__ else env_2.observation_space,
        hyperparameters['gamma'],
        hyperparameters['learning_rate'],
        hyperparameters['epsilon']
        )
    train_video = policy.train(env if "2" not in type(policy).__name__ else env_2, episodes, log_every)
    policy.save_policy(policy_file)
    train_videos[type(policy).__name__] = train_video

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
{'x0': 0, 'y0': 0, 'vel': 1, 'x1': 0, 'y1': 0}
{'x0': 0, 'y0': 0, 'vel': 0, 'x1': 0, 'y1': 0}
{'x0': 0, 'y0': 0, 'vel': 1, 'x1': 0, 'y1': 0}
{'x0': 0, 'y0': 0, 'vel': 0, 'x1': 0, 'y1': 0}
{'x0': 0, 'y0': 0, 'vel': 1, 'x1': 0, 'y1': 0}
{'x0': 0, 'y0': 0, 'vel': 0, 'x1': 0, 'y1': 0}
{'x0': 0, 'y0': 0, 'vel': 1, 'x1': 0, 'y1': 0}
{'x0': 0, 'y0': 0, 'vel': 0, 'x1': 0, 'y1': 0}
{'x0': 0, 'y0': 0, 'vel': 1, 'x1': 0, 'y1': 0}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 1, 'x1': 0, 'y1': 1}
{'x0': 0, 'y0': 1, 'vel': 0, 'x1': 0, 'y1'

In [18]:
# Video of learning process

embed_mp4(train_videos['QLearningPolicy2'])

In [19]:
# Video of learning process

embed_mp4(train_videos['SarsaPolicy2'])

## Validation

#### Validation Section

In this section, you will validate your agent’s performance using the best-performing configuration identified in the experimentation phase. This validation step should be less exhaustive than the experimentation section but should provide sufficient evidence of your agent's consistency and stability.

1. **Run Validation Trials**:
   - Using the chosen configuration, run the agent in the environment for **multiple trials** (e.g., 5-10 episodes) to test its reliability.
   - Observe the agent's performance across these trials to confirm whether it consistently achieves or exceeds the target score of 10.

2. **Provide Key Summaries**:
   - **Performance Metrics**: Summarize the agent’s performance with key metrics such as average score, highest score, and variance across trials.
   - **Graphs** (Optional): Include basic graphs showing score progression or any other relevant metric, if they provide additional insight into the agent's stability.

3. **Brief Analysis**:
   - Reflect on the agent's performance in validation. Note any patterns or inconsistencies and provide a short explanation of how these results align (or differ) from your expectations based on the experimentation phase.

*Note*: This validation section should confirm and strengthen the results of your experimentation, focusing on reliability and consistency rather than extensive analysis.

*Note*: Store the agent's policy to files since you asked to provide aditional notbook that loads the agent policy


In [20]:
eval_videos = {}
env = Game(custom_obs=True, preprocess=preprocess, pipe_gap=80, reward_shaping=reward_shaping, custome_observation_map=custome_observation_map)
env.seed(42)
env_2 = Game(custom_obs=True, preprocess=preprocess_2, pipe_gap=65, reward_shaping=reward_shaping, custome_observation_map=custome_observation_map)
env_2.seed(42)

policy_dict = {
  #TrivialPolicy: 'trivial_policy.json',
  #QLearningPolicy: 'qlearning_policy.json',
  QLearningPolicy2: 'qlearning_policy2.json',
  SarsaPolicy2: 'sarsa_policy2.json'
}

# Training all policies
# Currently, use only a single policy.
# We might want to break this loop and treat each policy separately
for policy, policy_file in policy_dict.items():
    policy = policy(
        env.action_space if "2" not in type(policy).__name__ else env_2.action_space,
        env.observation_space if "2" not in type(policy).__name__ else env_2.observation_space,
        hyperparameters['gamma'],
        hyperparameters['learning_rate'],
        hyperparameters['epsilon']
        )
    policy.load_policy(policy_file)
    eval_video = policy.run_policy(env if "2" not in type(policy).__name__ else env_2)
    eval_videos[type(policy).__name__] = eval_video

# Need to run the policy for 5+ times and extract statistics
# e.g. percentage of runs where we passed 10+ pipes

Initializing with action space: Discrete(2)
Initializing with observation space: Box([], [], (0,), int64)
Loading policy from qlearning_policy2.json
Loaded policy: {
    "0_0_0_0_0": [
        -0.06420417080613472,
        1.6545545365428893
    ],
    "0_0_1_0_0": [
        7.95264126833302,
        0
    ],
    "0_1_0_0_1": [
        8.640863403016375,
        -5.509072938706403
    ],
    "0_1_1_0_1": [
        -0.11873586467394384,
        8.508324869633089
    ],
    "0_1_0_0_0": [
        7.1496553457961145,
        0
    ],
    "0_1_1_0_0": [
        -0.21599139997669936,
        8.201814437886027
    ],
    "1_1_1_0_1": [
        -0.23542290762415555,
        7.853304748666047
    ],
    "1_1_0_0_1": [
        7.662491497226297,
        -8.854931304532267
    ],
    "1_1_1_0_0": [
        -0.22489104491000006,
        7.542671951197177
    ],
    "1_0_1_0_0": [
        7.385115755117756,
        -4.381842178810183
    ]
}
run_policy invoked
Encountered a terminal state
Sum of r

In [21]:
# Video of learning process
embed_mp4(eval_videos['QLearningPolicy2'])

In [22]:
# Video of learning process
embed_mp4(eval_videos['SarsaPolicy2'])

# Notes

### Heuristic
* Currently we solve the game by using a preprocessing based on a heuristic which checks if the bird is above or below 9 pixels below the middle of the next gap.
* This implies a 2x2 Q-table, which is very easy for learning.
* It seems that following the heuristic solves the game (10+ pipes) using the default distance between pipes.
* However, this seems to be true for the initial environment we get with seed 42. On subsequent iterations, when the environment is rendered differently, the proposed heuristic doesn't perform as well
* This calls for a more elaborate preprocessing, which output something more than a single boolean.

### Improved preprocessing
* Since we have only 2 actions, the Q-table is expected to contain 2 columns.
* The number of rows is the number of options we have _after_ we perform the preprocessing.
* We should strive to increase the table's size incrementally, without jumping into big tables (which are harder to learn)
* Perhaps we can start by replacing the boolean flag with a category-variable. E.g instead of 0 and 1 to signify whether we're above or below the middle of the gap (minus 9), we can specify the distance: distance from the middle of the gap // 5, and if this is above 19 we set to 19, and below -20 is set to -20. This is in practice quantizing the distance from the middle of the pipe to clusters of 5 pixels, and setting a maximum distance of 19 and -20. We'll have 40 rows, which increases the number of states by x40 factor. This will require running many more episodes when learning.

### Q-learning
* Currently, implementation does involve learning, but not with Q-learning
* We still need to implement Q-learning's algorithm.
* We are already set up for a easier

### Monte Carlo
* After we implement Q-learning, and improve our preprocessing step to allow us to learn a policy which is effective for all (or most) possible environments, we can start thinking about implementing Monte Carlo

### Next steps
1. Implement Q-learning policy update on current binary observation state
2. Increase Epsilon to 0.1. It is currently set to 0 for testing.
3. Make sure the new policy update converges properly
4. Run the evaluation on 5+ episodes to make sure Q-learning on trivial state space is working
5. Experiment with increasing the state space by updating the `preprocess` function
6. Clean the code a little bit
7. Add a class for Monte Carlo policy learning