<a href="https://colab.research.google.com/github/kirwarobert/cnn/blob/main/Deep_Reinforcing_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction
So far, our agents have relied on detailed information about how to play the game. The heuristic really provides a lot of guidance about how to select moves!

In this tutorial, you'll learn how to use reinforcement learning to build an intelligent agent without the use of a heuristic. Instead, we will gradually refine the agent's strategy over time, simply by playing the game and trying to maximize the winning rate.

In this notebook, we won't be able to explore this complex field in detail, but you'll learn about the big picture and explore code that you can use to train your own agent.

# Neural Networks
It's difficult to come up with a perfect heuristic. Improving the heuristic generally entails playing the game many times, to determine specific cases where the agent could have made better choices. And, it can prove challenging to interpret what exactly is going wrong, and ultimately to fix old mistakes without accidentally introducing new ones.

Wouldn't it be much easier if we had a more systematic way of improving the agent with gameplay experience?

In this tutorial, towards this goal, we'll replace the heuristic with a neural network.

The network accepts the current board as input. And, it outputs a probability for each possible move.

Then, the agent selects a move by sampling from these probabilities. For instance, for the game board in the image above, the agent selects column 4 with 50% probability.

This way, to encode a good gameplay strategy, we need only amend the weights of the network so that for every possible game board, it assigns higher probabilities to better moves.

At least in theory, that's our goal. In practice, we won't actually check if that's the case -- since remember that Connect Four has over 4 trillion possible game boards!

# Setup
How can we approach the task of amending the weights of the network, in practice? Here's the approach we'll take in this lesson:

After each move, we give the agent a reward that tells it how well it did:
If the agent wins the game in that move, we give it a reward of +1.
Else if the agent plays an invalid move (which ends the game), we give it a reward of -10.
Else if the opponent wins the game in its next move (i.e., the agent failed to prevent its opponent from winning), we give the agent a reward of -1.
Else, the agent gets a reward of 1/42.
At the end of each game, the agent adds up its reward. We refer to the sum of rewards as the agent's cumulative reward.
For instance, if the game lasted 8 moves (each player played four times), and the agent ultimately won, then its cumulative reward is 3*(1/42) + 1.
If the game lasted 11 moves (and the opponent went first, so the agent played five times), and the opponent won in its final move, then the agent's cumulative reward is 4*(1/42) - 1.
If the game ends in a draw, then the agent played exactly 21 moves, and it gets a cumulative reward of 21*(1/42).
If the game lasted 7 moves and ended with the agent selecting an invalid move, the agent gets a cumulative reward of 3*(1/42) - 10.
Our goal is to find the weights of the neural network that (on average) maximize the agent's cumulative reward.

This idea of using reward to track the performance of an agent is a core idea in the field of reinforcement learning. Once we define the problem in this way, we can use any of a variety of reinforcement learning algorithms to produce an agent.

# inforcement Learning
There are many different reinforcement learning algorithms, such as DQN, A2C, and PPO, among others. All of these algorithms use a similar process to produce an agent:

Initially, the weights are set to random values.
As the agent plays the game, the algorithm continually tries out new values for the weights, to see how the cumulative reward is affected, on average. Over time, after playing many games, we get a good idea of how the weights affect cumulative reward, and the algorithm settles towards weights that performed better.
Of course, we have glossed over the details here, and there's a lot of complexity involved in this process. For now, we focus on the big picture!
This way, we'll end up with an agent that tries to win the game (so it gets the final reward of +1, and avoids the -1 and -10) and tries to make the game last as long as possible (so that it collects the 1/42 bonus as many times as it can).
You might argue that it doesn't really make sense to want the game to last as long as possible -- this might result in a very inefficient agent that doesn't play obvious winning moves early in gameplay. And, your intuition would be correct -- this will make the agent take longer to play a winning move! The reason we include the 1/42 bonus is to help the algorithms we'll use to converge better. Further discussion is outside of the scope of this course, but you can learn more by reading about the "temporal credit assignment problem" and "reward shaping".
In the next section, we'll use the Proximal Policy Optimization (PPO) algorithm to create an agent.

# Code
There are a lot of great implementations of reinforcement learning algorithms online. In this course, we'll use Stable-Baselines3.

There's a bit of extra work that we need to do to make the environment compatible with Stable Baselines. For this, we define the ConnectFourGym class below. This class implements ConnectX as an OpenAI Gym environment and uses several methods:

reset() will be called at the beginning of every game. It returns the starting game board as a 2D numpy array with 6 rows and 7 columns.
change_reward() customizes the rewards that the agent receives. (The competition already has its own system for rewards that are used to rank the agents, and this method changes the values to match the rewards system we designed.)
step() is used to play the agent's choice of action (supplied as action), along with the opponent's response. It returns:
the resulting game board (as a numpy array),
the agent's reward (from the most recent move only: one of +1, -10, -1, or 1/42), and
whether or not the game has ended (if the game has ended, done=True; otherwise, done=False).
To learn more about how to define environments, check out the documentation here.

In [2]:
!pip install kaggle-environments
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import gym
from kaggle_environments import make, evaluate # This line caused the error
from gym import spaces

class ConnectFourGym(gym.Env):
    def __init__(self, agent2="random"):
        ks_env = make("connectx", debug=True)
        self.env = ks_env.train([None, agent2])
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = spaces.Discrete(self.columns)
        self.observation_space = spaces.Box(low=0, high=2,
                                            shape=(1,self.rows,self.columns), dtype=int)
        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-10, 1)
        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None
    def reset(self):
        self.obs = self.env.reset()
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns)
    def change_reward(self, old_reward, done):
        if old_reward == 1: # The agent won the game
            return 1
        elif done: # The opponent won the game
            return -1
        else: # Reward 1/42
            return 1/(self.rows*self.columns)
    def step(self, action):
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.env.step(int(action))
            reward = self.change_reward(old_reward, done)
        else: # End the game and penalize agent
            reward, done, _ = -10, True, {}
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns), reward, done, _

Collecting kaggle-environments
  Downloading kaggle_environments-1.16.11-py2.py3-none-any.whl.metadata (10 kB)
Collecting Chessnut>=0.4.1 (from kaggle-environments)
  Downloading Chessnut-0.4.1-py3-none-any.whl.metadata (1.2 kB)
Collecting gymnasium==0.29.0 (from kaggle-environments)
  Downloading gymnasium-0.29.0-py3-none-any.whl.metadata (10 kB)
Collecting pettingzoo==1.24.0 (from kaggle-environments)
  Downloading pettingzoo-1.24.0-py3-none-any.whl.metadata (8.1 kB)
Collecting shimmy>=1.2.1 (from kaggle-environments)
  Downloading Shimmy-2.0.0-py3-none-any.whl.metadata (3.5 kB)
Collecting stable-baselines3==2.1.0 (from kaggle-environments)
  Downloading stable_baselines3-2.1.0-py3-none-any.whl.metadata (5.2 kB)
INFO: pip is looking at multiple versions of shimmy to determine which version is compatible with other requirements. This could take a while.
Collecting shimmy>=1.2.1 (from kaggle-environments)
  Downloading Shimmy-1.3.0-py3-none-any.whl.metadata (3.7 kB)
Collecting nvidia-c

  File "/usr/local/lib/python3.11/dist-packages/gymnasium/envs/registration.py", line 594, in load_plugin_envs
    fn()
  File "/usr/local/lib/python3.11/dist-packages/shimmy/registration.py", line 304, in register_gymnasium_envs
    _register_atari_envs()
  File "/usr/local/lib/python3.11/dist-packages/shimmy/registration.py", line 205, in _register_atari_envs
    import ale_py
  File "/usr/local/lib/python3.11/dist-packages/ale_py/__init__.py", line 68, in <module>
    register_v0_v4_envs()
  File "/usr/local/lib/python3.11/dist-packages/ale_py/registration.py", line 179, in register_v0_v4_envs
    _register_rom_configs(legacy_games, obs_types, versions)
  File "/usr/local/lib/python3.11/dist-packages/ale_py/registration.py", line 64, in _register_rom_configs
    gymnasium.register(
    ^^^^^^^^^^^^^^^^^^
AttributeError: partially initialized module 'gymnasium' has no attribute 'register' (most likely due to a circular import)
[0m
  logger.warn(f"plugin: {plugin.value} raised {trace

In this notebook, we'll train an agent to beat the random agent. We specify this opponent in the agent2 argument below.

In [3]:
# Create ConnectFour environment
env = ConnectFourGym(agent2="random")

The next step is to specify the architecture of the neural network. In this case, we use a convolutional neural network. To learn more about how to specify architectures with Stable-Baselines3, check out the documentation here.

Note that this is the neural network that outputs the probabilities of selecting each column. Since we use the PPO algorithm (PPO in the code cell below), our network will also output some additional information (called the "value" of the input). This is outside the scope of this course, but you can learn more by reading about "actor-critic networks".

In [4]:
import torch as th
import torch.nn as nn

!pip install "stable-baselines3"
from stable_baselines3 import PPO
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

# Neural network for predicting action values
class CustomCNN(BaseFeaturesExtractor):

    def __init__(self, observation_space: gym.spaces.Box, features_dim: int=128):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        # CxHxW images (channels first)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
)

# Initialize agent
model = PPO("CnnPolicy", env, policy_kwargs=policy_kwargs, verbose=0)





In the code cell above, the weights of the neural network are initially set to random values.

In the next code cell, we "train the agent", which is just another way of saying that we find weights of the neural network that are likely to result in the agent selecting good moves.

In [5]:
# Train agent
model.learn(total_timesteps=60000)

<stable_baselines3.ppo.ppo.PPO at 0x7ab168105150>

Finally, we specify the trained agent in the format required for the competition

In [6]:
def agent1(obs, config):
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move.
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])

In the next code cell, we see the outcome of one game round against a random agent.

In [7]:
# Create the game environment
env = make("connectx")

# Two random agents play one game round
env.run([agent1, "random"])

# Show the game
env.render(mode="ipython")

And, we calculate how it performs on average, against the random agent

In [8]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

In [9]:
get_win_percentages(agent1=agent1, agent2="random")

Agent 1 Win Percentage: 0.77
Agent 2 Win Percentage: 0.23
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0
