# Human vs bot game

In this notebook, we give a tutorial on how to run a pygame between two differing agents, where there is an option to be the agent in a game yourself.
Thus, this notebook guides you in using a bot as the opponent in the connect four game.

<hr><hr>

## Table of Contents

- Contact information
- Checking requirements
  - Correct Anaconda environment
  - Correct module access
  - Correct CUDA access
- Setup the game

<hr><hr>

## Contact information

| Name             | Student ID | VUB mail                                                  | Personal mail                                               |
| ---------------- | ---------- | --------------------------------------------------------- | ----------------------------------------------------------- |
| Lennert Bontinck | 0568702    | [lennert.bontinck@vub.be](mailto:lennert.bontinck@vub.be) | [info@lennertbontinck.com](mailto:info@lennertbontinck.com) |



<hr><hr>

## Checking requirements

### Correct Anaconda environment

The `rl-project` anaconda environment should be active to ensure proper support. Installation instructions are available on [the GitHub repository of the RL course project and homeworks](https://github.com/pikawika/vub-rl).

In [1]:
####################################################
# CHECKING FOR RIGHT ANACONDA ENVIRONMENT
####################################################

import os
from platform import python_version

print(f"Active environment: {os.environ['CONDA_DEFAULT_ENV']}")
print(f"Correct environment: {os.environ['CONDA_DEFAULT_ENV'] == 'rl-project'}")
print(f"\nPython version: {python_version()}")
print(f"Correct Python version: {python_version() == '3.8.10'}")

Active environment: rl-project
Correct environment: True

Python version: 3.8.10
Correct Python version: True


<hr>

### Correct module access

The following code block will load in all required modules and show if the versions match those that are recommended.

In [3]:
####################################################
# LOADING MODULES
####################################################

# Allow reloading of libraries
import importlib

# Tianshou for RL algorithms
import tianshou as ts; print(f"Tianshou version (0.4.8 recommended): {ts.__version__}")

# Torch is a popular DL framework
import torch; print(f"Torch version (1.11.0 recommended): {torch.__version__}")

# Our custom connect four gym environment
import sys
sys.path.append('../')
import human_vs_bot_connect4.human_vs_bot_connect_four as game
importlib.invalidate_caches();
importlib.reload(game);

# More data types
import typing
import numpy as np


Tianshou version (0.4.8 recommended): 0.4.8
Torch version (1.11.0 recommended): 1.12.0.dev20220520+cu116


<hr>

### Correct CUDA access

The installation instructions specify how to install PyTorch with CUDA 11.6.
The following code block tests if this was done successfully.

In [4]:
####################################################
# CUDA VALIDATION
####################################################

# Check cuda available
print(f"CUDA is available: {torch.cuda.is_available()}")

# Show cuda devices
print(f"\nAmount of connected devices supporting CUDA: {torch.cuda.device_count()}")

# Show current cuda device
print(f"\nCurrent CUDA device: {torch.cuda.current_device()}")

# Show cuda device name
print(f"Cuda device 0 name: {torch.cuda.get_device_name(0)}")

CUDA is available: True

Amount of connected devices supporting CUDA: 1

Current CUDA device: 0
Cuda device 0 name: NVIDIA GeForce GTX 970


<hr><hr>

## Policies

We need to specify the policies the trained agent's weight are for.



In [5]:
####################################################
# DQN POLICY FROM PAPER NOTEBOOK 5
####################################################

class CustomDQN(torch.nn.Module):
    """
    Custom DQN using a model based on CNN
    """
    def __init__(self,
                 state_shape: typing.Sequence[int],
                 action_shape: typing.Sequence[int],
                 device: typing.Union[str, int, torch.device] = 'cuda' if torch.cuda.is_available() else 'cpu',):
        # Parent call
        super().__init__()
        
        # Save device (e.g. cuda)
        self.device = device
        
        self.model = torch.nn.Sequential(
            torch.nn.Linear(np.prod(state_shape), 128), torch.nn.ReLU(inplace=True),
            torch.nn.Linear(128, 128), torch.nn.ReLU(inplace=True),
            torch.nn.Linear(128, 128), torch.nn.ReLU(inplace=True),
            torch.nn.Linear(128, np.prod(action_shape)),
        )

    def forward(self, obs, state=None, info={}):
        if not isinstance(obs, torch.Tensor):
            obs = torch.tensor(obs, dtype=torch.float, device=self.device)
        batch = obs.shape[0]
        logits = self.model(obs.view(batch, -1))
        return logits, state

def cf_custom_dqn_policy(state_shape: tuple,
                         action_shape: tuple,
                         learning_rate: float =  0.0001,
                         gamma: float = 0.9, # Smaller gamma favours "faster" win
                         n_step: int = 1, # Number of steps to look ahead
                         target_update_freq: int = 320):
    # Use cuda device if possible
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    # Network to be used for DQN
    net = CustomDQN(state_shape, action_shape, device= device).to(device)
    
    # Default optimizer is an adam optimizer with the argparser learning rate
    optim = torch.optim.Adam(net.parameters(), lr= learning_rate)
        
    # Our agent DQN policy
    return ts.policy.DQNPolicy(model= net,
                               optim= optim,
                               discount_factor= gamma,
                               estimation_step= n_step,
                               target_update_freq= target_update_freq)

In [6]:
####################################################
# DQN POLICY FROM PAPER NOTEBOOK 7
####################################################

class CNNBasedDQN(torch.nn.Module):
    """
    Custom DQN using a model based on CNN
    """
    def __init__(self,
                 state_shape: typing.Sequence[int],
                 action_shape: typing.Sequence[int],
                 device: typing.Union[str, int, torch.device] = 'cuda' if torch.cuda.is_available() else 'cpu',):
        # Parent call
        super().__init__()
        
        # Save device (e.g. cuda)
        self.device = device
        
        # Number of input channels
        input_channels_cnn = 1
        output_channels_cnn = 32
        flatten_size = (state_shape[0] - 3) * (state_shape[1] - 3) * output_channels_cnn
        output_size= np.prod(action_shape)
        
        self.model = torch.nn.Sequential(
            torch.nn.Conv2d(in_channels= input_channels_cnn, out_channels= output_channels_cnn, kernel_size= 4, stride= 1), torch.nn.ReLU(inplace=True),
            torch.nn.Flatten(0,-1),
            torch.nn.Unflatten(0, (1, flatten_size)),
            torch.nn.Linear(flatten_size, 128), torch.nn.ReLU(inplace=True),
            torch.nn.Linear(128, 128), torch.nn.ReLU(inplace=True),
            torch.nn.Linear(128, output_size),
        )

    def forward(self, obs, state=None, info={}):
        if not isinstance(obs, torch.Tensor):
            obs = torch.tensor(obs, dtype=torch.float, device=self.device)
        
        logits = self.model(obs)
        return logits, state

    
def cf_cnn_dqn_policy(state_shape: tuple,
                      action_shape: tuple,
                      optim: typing.Optional[torch.optim.Optimizer] = None,
                      learning_rate: float =  0.0001,
                      gamma: float = 0.9, # Smaller gamma favours "faster" win
                      n_step: int = 4, # Number of steps to look ahead
                      frozen: bool = False,
                      target_update_freq: int = 320):
    # Use cuda device if possible
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    # Network to be used for DQN
    net = CNNBasedDQN(state_shape, action_shape, device= device).to(device)
    
    # Default optimizer is an adam optimizer with the argparser learning rate
    if optim is None:
        optim = torch.optim.Adam(net.parameters(), lr= learning_rate)
        
    # If we are frozen, we use an optimizer that has learning rate 0
    if frozen:
        optim = torch.optim.SGD(net.parameters(), lr= 0)
        
        
    # Our agent DQN policy
    return ts.policy.DQNPolicy(model= net,
                               optim= optim,
                               discount_factor= gamma,
                               estimation_step= n_step,
                               target_update_freq= target_update_freq)

In [9]:
####################################################
# RAINBOW POLICY FROM PAPER NOTEBOOK 9
####################################################

class DQNForRainbow(torch.nn.Module):
    """
    Custom DQN to be used as baseclass for the Rainbow algorithm.
    """

    def __init__(
        self,
        c: int,
        h: int,
        w: int,
        action_shape: typing.Sequence[int],
        device: typing.Union[str, int, torch.device] = "cpu",
        features_only: bool = False,
        output_dim: typing.Optional[int] = None,
    ) -> None:
        super().__init__()
        self.device = device
        
        # Number of input channels
        input_channels_cnn = 1
        output_channels_cnn = 32
        self.output_dim = (h - 3) * (w - 3) * output_channels_cnn
        
        self.net = torch.nn.Sequential(
            torch.nn.Conv2d(in_channels= input_channels_cnn, out_channels= output_channels_cnn, kernel_size= 4, stride= 1), torch.nn.ReLU(inplace=True),
            torch.nn.Flatten(),
        )
        if not features_only:
            self.net = torch.nn.Sequential(
                self.net, torch.nn.Linear(self.output_dim, 512), torch.nn.ReLU(inplace=True),
                torch.nn.Linear(512, np.prod(action_shape))
            )
            self.output_dim = np.prod(action_shape)
        elif output_dim is not None:
            self.net = torch.nn.Sequential(
                self.net, torch.nn.Linear(self.output_dim, output_dim),
                torch.nn.ReLU(inplace=True)
            )
            self.output_dim = output_dim

    def forward(
        self,
        obs: typing.Union[np.ndarray, torch.Tensor],
        state: typing.Optional[typing.Any] = None,
        info: typing.Dict[str, typing.Any] = {},
    ) -> typing.Tuple[torch.Tensor, typing.Any]:
        r"""Mapping: s -> Q(s, \*)."""
        if not isinstance(obs, torch.Tensor):
            obs = torch.tensor(obs, dtype=torch.float, device=self.device)
            
        # Make right shape
        obs = obs[:, None, :, :]
        
        logits = self.net(obs)
        return logits, state


class Rainbow(DQNForRainbow):
    """
    Implementation of the Rainbow algorithm.
    """

    def __init__(
        self,
        c: int,
        h: int,
        w: int,
        action_shape: typing.Sequence[int],
        num_atoms: int = 51,
        noisy_std: float = 0.5,
        device: typing.Union[str, int, torch.device] = "cpu",
        is_dueling: bool = True,
        is_noisy: bool = True,
    ) -> None:
        super().__init__(c, h, w, action_shape, device, features_only=True)
        self.action_num = np.prod(action_shape)
        self.num_atoms = num_atoms

        def linear(x, y):
            if is_noisy:
                return ts.utils.net.discrete.NoisyLinear(x, y, noisy_std)
            else:
                return torch.nn.Linear(x, y)

        self.Q = torch.nn.Sequential(
            linear(self.output_dim, 512), torch.nn.ReLU(inplace=True),
            linear(512, self.action_num * self.num_atoms)
        )
        self._is_dueling = is_dueling
        if self._is_dueling:
            self.V = torch.nn.Sequential(
                linear(self.output_dim, 512), torch.nn.ReLU(inplace=True),
                linear(512, self.num_atoms)
            )
        self.output_dim = self.action_num * self.num_atoms

    def forward(
        self,
        obs: typing.Union[np.ndarray, torch.Tensor],
        state: typing.Optional[typing.Any] = None,
        info: typing.Dict[str, typing.Any] = {},
    ) -> typing.Tuple[torch.Tensor, typing.Any]:
        r"""Mapping: x -> Z(x, \*)."""
        obs, state = super().forward(obs)
        q = self.Q(obs)
        q = q.view(-1, self.action_num, self.num_atoms)
        if self._is_dueling:
            v = self.V(obs)
            v = v.view(-1, 1, self.num_atoms)
            logits = q - q.mean(dim=1, keepdim=True) + v
        else:
            logits = q
        probs = logits.softmax(dim=2)
        return probs, state
    
    
def rainbow_policy(state_shape: tuple,
                  action_shape: tuple,
                  optim: typing.Optional[torch.optim.Optimizer] = None,
                  learning_rate: float =  0.0000625,
                  gamma: float = 0.9, # Smaller gamma favours "faster" win
                  n_step: int = 1, # Number of steps to look ahead
                  num_atoms: int = 51,
                  target_update_freq: int = 500):
    
    # Use cuda device if possible
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    # Rainbow network to be used by policy
    net = Rainbow(c= 1,
                  h= state_shape[0],
                  w= state_shape[1],
                  action_shape= action_shape,                  
                  device= device).to(device)
    
    # Default optimizer is an adam optimizer with the argparser learning rate
    if optim is None:
        optim = torch.optim.Adam(net.parameters(), lr= learning_rate)
        
    # Our agent DQN policy
    return ts.policy.RainbowPolicy(model= net,
                                   optim= optim,
                                   discount_factor= gamma,
                                   num_atoms= num_atoms,
                                   estimation_step= n_step,
                                   target_update_freq= target_update_freq).to(device)

<hr><hr>

## Setup the game



In [11]:
####################################################
# SETUP THE GAME
####################################################

# Specify either an instance of an object that can predict a move or "me" for player 1
player1 = "me"

# Specify either an instance of an object that can predict a move or "me" for player 2
player2 = rainbow_policy(state_shape= (6, 7),
                         action_shape= (7,))
player2.load_state_dict(torch.load("../paper_notebooks/./saved_variables/paper_notebooks/9/rainbow_vs_rainbow/best_policy_agent2.pth"))

# Play the game
game.play_game(player1= player1,
               player2= player2)

In [41]:
####################################################
# REMOVE UNUSED VARIABLES
####################################################

del player1
del player2