In [None]:
import random
import numpy as np


import gym
from gym import spaces

from stable_baselines3 import PPO
from stable_baselines3.common.policies import BaseFeaturesExtractor

import torch
from torch import nn

from kaggle_environments import make, evaluate

from tool.check_win import check_win, get_position
from tool.check_three import check_three





# env.step
回傳值:(obs, reward, done , _)
## obs
- 只有board有用?
## reward
- 贏了是1
- 平常是0
## done
- 遊戲是否結束

In [None]:
#建置openAI GYM的環境
#code from: https://www.kaggle.com/code/alexisbcook/deep-reinforcement-learning
class ConnectFourGym(gym.Env):
    def __init__(self, agent2="random"):
        ks_env = make("connectx", debug=True)
        self.env = ks_env.train([None, agent2])
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = spaces.Discrete(self.columns)
        self.observation_space = spaces.Box(low=0, high=2, 
                                            shape=(1,self.rows,self.columns), dtype=int)
        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-10, 10)
        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None
    def reset(self):
        self.obs = self.env.reset()
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns)
    def change_reward(self, action : int, old_reward : int, done : bool):

        #決勝
        if (done):
            if (old_reward == 1): return 10
            else: return -10
        

        board = np.array(self.obs['board'])
        board = list(board.reshape(-1))
        
        #對面差一顆就能連成4的數量
        for i in range(7):
            if (check_win(board, self.rows, self.columns, i, 2) and i == action): 
                return 1
        
        #差一顆就能連成4的數量
        posible_three = check_three(board, self.rows, self.columns, action, 1)
        

        if (posible_three >= 2): return posible_three * 0.5


        return 0
            
        
    def step(self, action):
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        if is_valid: # Play the move
            new_obs, old_reward, done, _ = self.env.step(int(action))
            reward = self.change_reward(action, old_reward, done)
            self.obs = new_obs


        else: # End the game and penalize agent
            reward, done, _ = -20, True, {}
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns), reward, done, _




In [None]:
#讓agent更好的抓取圖像特徵
class CustomCNNFeatureExtractor(BaseFeaturesExtractor): # Custom
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 256):
        super(CustomCNNFeatureExtractor, self).__init__(observation_space, features_dim)
        
        input_channel = observation_space.shape[0]
        self.output_shape = observation_space.shape[1]

        
        self.input_cnn = nn.Sequential(
            nn.Conv2d(in_channels=input_channel, out_channels=32, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=2, stride=1),
            nn.ReLU(),
            
            nn.Flatten())
        
        with torch.no_grad():
            
            n_flatten = self.input_cnn(torch.as_tensor(observation_space.sample()[None]).float()).shape[1]
            

        
        self.output = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.LeakyReLU())


    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        x = self.input_cnn(observations)
        
        x = self.output(x)
        
        return x



    

In [None]:

def opponent(obs, config):
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    for i in valid_moves:
        if (check_win(board=obs.board, row=config.rows, col=config.columns, choice=i, player=2)):
            return i
    for i in valid_moves:
        if (check_win(board=obs.board, row=config.rows, col=config.columns, choice=i, player=1)):
            return i
            

    return random.choice(valid_moves)

In [None]:
policy_kwargs = dict(
    features_extractor_class=CustomCNNFeatureExtractor,
    )
env = ConnectFourGym(agent2=opponent)

model = PPO("CnnPolicy", env, policy_kwargs=policy_kwargs, learning_rate=0.0001, n_steps=4096, n_epochs=20, verbose=1, batch_size=256)

# model.policy.mlp_extractor.policy_net = nn.Sequential(nn.Linear(256, 128),
#                                                     nn.LeakyReLU(),
#                                                     nn.Linear(128, 64,),
#                                                     nn.Tanh())
# model.policy.mlp_extractor.value_net = nn.Sequential(nn.Linear(256, 128),
#                                                     nn.LeakyReLU(),
#                                                     nn.Linear(128, 64,),
#                                                     nn.Tanh())
# model.policy.mlp_extractor.policy_net = \
#     model.policy.mlp_extractor.policy_net.to(device="cuda", dtype=torch.float32)
# model.policy.mlp_extractor.value_net = \
#     model.policy.mlp_extractor.value_net.to(device="cuda", dtype=torch.float32)

model.policy

In [None]:
model.learn(total_timesteps=204800)

In [None]:
policy = model.policy
win_p = 0
def use_policy(obs, config):
    

    #將資料整理成輸入格式
    board = np.array(obs["board"])
    board = board.reshape(1, 1, config.rows, config.columns)
    board_tensor = torch.tensor(board)
    

    board_tensor = board_tensor.to(device="cuda", dtype=torch.float32)


    predict = int(policy.forward(board_tensor)[0])


    # valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    # for i in valid_moves:
    #     if (check_win(board=obs.board, row=config.rows, col=config.columns, choice=i, player=2)):
    #         predict = i
    # for i in valid_moves:
    #     if (check_win(board=obs.board, row=config.rows, col=config.columns, choice=i, player=1)):
    #         predict = i

    # #陽春顯示畫面....
    # new_board, _ = get_position(board.reshape(config.rows, config.columns), predict, 1)
    # print(new_board)
    # global win_p
    
    # win = check_win(list(board), config.rows, config.columns, predict, 1)
    # if (win): win_p += 1
    #print(win)
    return predict

def human_play(obs, config):
    board = np.array(obs["board"])
    


In [None]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

In [None]:
get_win_percentages(use_policy, opponent, 500)



In [None]:
#model.save("connect_x_tie_048")