<a href="https://colab.research.google.com/github/mohamedyosef101/101_learning_area/blob/area/Reinforcement%20Learning/ConnectX/03_deepRL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Deep** Reinforcement Learning

Source: [Intro to Game AI & RL](https://www.kaggle.com/code/alexisbcook/deep-reinforcement-learning/tutorial). Kaggle.

In [1]:
%%capture
!pip install kaggle_environments

In [2]:
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import gym
from gym import spaces
from kaggle_environments import make, evaluate

# Create the environment

In [3]:
class ConnectFourGym(gym.Env):
  def __init__(self, agent2="random"):
    ks_env = make("connectx", debug=True)
    self.env = ks_env.train([None, agent2])
    self.rows = ks_env.configuration.rows
    self.columns = ks_env.configuration.columns
    self.action_space = spaces.Discrete(self.columns)
    self.observation_space = spaces.Box(low=0, high=2,
                                        shape=(1, self.rows, self.columns),
                                        dtype=int)
    self.reward_range = (-10, 1)
    self.spec = None
    self.metadata = None

  def reset(self):
    self.obs = self.env.reset()
    obs_reshape = np.array(self.obs['board']).reshape(1, self.rows,
                                                      self.columns)
    return obs_reshape


  def change_reward(self, old_reward, done):
    if old_reward == 1:
      return 1
    elif done:
      return -1
    else:
      return 1/(self.rows*self.columns)

  def step(self, action):
    is_valid = (self.obs['board'][int(action)] == 0)
    if is_valid:
      self.obs, old_reward, done, _ = self.env.step(int(action))
      reward = self.change_reward(old_reward, done)
    else:
      reward, done, _ = -10, True, {}
    obs_reshape = np.array(self.obs['board']).reshape(1, self.rows,
                                                      self.columns)
    return obs_reshape, reward, done, _

In [5]:
env = ConnectFourGym(agent2="random")

# Neural Network Architecture

In [6]:
%%capture
!pip install "stable-baselines3"

In [8]:
import torch
import torch.nn as nn
from stable_baselines3 import PPO as ppo
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor as bfe

In [11]:
class CustomCNN(bfe):
  def __init__(self, observation_space: gym.spaces.Box,
               features_dim: int=128):
    super(CustomCNN, self).__init__(observation_space, features_dim)
    n_input_channels = observation_space.shape[0]

    self.cnn = nn.Sequential(
        nn.Conv2d(n_input_channels, 32, kernel_size=3),
        nn.ReLU(),
        nn.Conv2d(32, 64, kernel_size=3),
        nn.ReLU(),
        nn.Flatten()
    )
    with torch.no_grad():
      n_flatten = self.cnn(
          torch.as_tensor(observation_space.sample()[None]).float()
      ).shape[1]
      self.linear = nn.Sequential(
          nn.Linear(n_flatten, features_dim), nn.ReLU()
      )
  def forward(self, observations: torch.Tensor) -> torch.Tensor:
    return self.linear(self.cnn(observations))

In [12]:
# Initialize agent
policy_kwargs = dict(features_extractor_class=CustomCNN)
model = ppo("CnnPolicy", env, policy_kwargs=policy_kwargs, verbose=0)



In [None]:
# Train agent
model.learn(total_timesteps=60000)

# Define our agent

In [None]:
def agent1(obs, config):
  col, _ = model.predict(np.array(obs['board']).reshape(1, 6, 7))
  is_valid = (obs['board'][int(col)])

  if is_valid:
    return int(col)
  else:
    return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])

In [None]:
# play the game
env = make("connectx")
env.run([agent1, "random"])
env.render(mode="ipython")

# Evauation

In [None]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

In [None]:
get_win_percentages(agent1=agent1, agent2="random")