In [None]:
import numpy as np
import time
import tkinter as tk
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Dense
from keras.optimizers import Adam
from keras.models import save_model
import random
from collections import deque
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pylab as plt

In [None]:
unit = 40
r = (unit-10)/2
maze_h = 6
maze_w = 7
reward_w = 20 #wins
reward_l = -20 #loss
reward_draw = 10 #draw
reward_illegal = -1000 #illegal move
reward_e = -1 #otherwise. negative to reduce steps
moves = []
illegal_moves = []
moves_avg = []
illegal_moves_avg = []
episode_count = 100

In [None]:
class Board:
    def __init__(self):
        self.window = tk.Tk()
        self.window.title("Connect 4")
        self.window.geometry('{0}x{1}'.format(maze_w*unit,(maze_h+1)*unit))
        self.action_space = [_ for _ in range(maze_w)]
        self.n_action = len(self.action_space)
        self.state = np.zeros([maze_h,maze_w])
        self.num_piece = 0
        self.click_allowed = 1
        self.player = 1
        self.playstyle = 1 # 1 = pvp, 2 = pva 
        self.color = {1:"red", 2:"yellow"}
        self.build_board()
            
    def build_board(self):
        self.canvas = tk.Canvas(self.window,bg='black',width=maze_w*unit,height=(maze_h+1)*unit)
        
        self.top = self.canvas.create_rectangle(
            0, 0,
            maze_w*unit, unit,
            fill="white"
        )
        
        self.mylabel = self.canvas.create_text((maze_w*unit/2), (unit/2), text="Player 1's turn")
        
        for x in range(0,maze_w):
            for y in range(0,maze_h):
                cx = (x*unit) + (unit/2)
                cy = (y*unit) + (unit/2) + unit
                s = "hole"+str(x)+str(y)
                self.s = self.canvas.create_oval(
                    cx-r,cy-r,
                    cx+r,cy+r,
                    fill="white"
                )
        
        def button_clicked(event):
            if(self.click_allowed == 1):
                action = (event.x)//unit
                s, r, d, wt = self.get_state_reward(self.player, action)
                if d == True:
                    if r == reward_w:
                        self.click_allowed = 0
                        self.canvas.itemconfigure(self.mylabel, text="Player "+str(self.player)+" wins")
                    else:
                        self.canvas.itemconfigure(self.mylabel, text="Draw")
                else:
                    if r == reward_e:
                        if self.player == 1:
                            self.player = 2
                            self.canvas.itemconfigure(self.mylabel, text="Player "+str(self.player)+"\'s turn")
                        else:
                            self.player = 1
                            self.canvas.itemconfigure(self.mylabel, text="Player "+str(self.player)+"\'s turn")
                        if self.playstyle == 2:
                            self.click_allowed = 0
                

        
        self.canvas.bind("<Button-1>", button_clicked)
        
        self.canvas.pack()
                        
    def render(self):
        self.window.update()
        
    def reset(self):
        self.window.update()
        self.num_piece = 0
        self.canvas.delete("pieces")
        self.state = np.zeros([maze_h,maze_w])
        self.player = 1
        self.canvas.itemconfigure(self.mylabel, text="Player 1\'s turn")
        self.render()
        return self.state
        
    def get_state_reward(self, player, action):
        y = maze_h-1
        flag = 1 #to check if valid action is taken
        while(y >= 0):
            if(self.state[y][action] == 0):
                self.state[y][action] = player
                self.place_piece(action, y, player)
                flag = 0
                break
            else:
                y -= 1
        
        if self.num_piece == (maze_h*maze_w):
            reward = reward_draw
            done = True
            return self.state, reward, done, 0 #win_type = 0
        
        if flag: #invalid action
            reward, win_type = reward_illegal, 0 #win_type = 0
        else: #vald action
            reward, win_type = self.is_terminal(self.state,player)
        
        if reward == reward_w:
            done = True
        else:
            done = False
        
        return self.state, reward, done, win_type
    
    def place_piece(self, x, y, player):
        cx = (x*unit) + (unit/2)
        cy = (y*unit) + (unit/2) + unit
        s = "piece"+str(x)+str(y)
        self.s = self.canvas.create_oval(
            cx-r,cy-r,
            cx+r,cy+r,
            fill=self.color[player],
            tags="pieces"
        )
        self.num_piece += 1

    def is_terminal(self, board, player):
        # Check horizontal locations for win
        for c in range(maze_w-3):
            for r in range(maze_h):
                if board[r][c] == player and board[r][c+1] == player and board[r][c+2] == player and board[r][c+3] == player:
                    return reward_w, 1

        # Check vertical locations for win
        for c in range(maze_w):
            for r in range(maze_h-3):
                if board[r][c] == player and board[r+1][c] == player and board[r+2][c] == player and board[r+3][c] == player:
                    return reward_w, 2

        # Check positively sloped diaganols
        for c in range(maze_w-3):
            for r in range(maze_h-3):
                if board[r][c] == player and board[r+1][c+1] == player and board[r+2][c+2] == player and board[r+3][c+3] == player:
                    return reward_w, 3

        # Check negatively sloped diaganols
        for c in range(maze_w-3):
            for r in range(3, maze_h):
                if board[r][c] == player and board[r-1][c+1] == player and board[r-2][c+2] == player and board[r-3][c+3] == player:
                    return reward_w, 3     
        
        return reward_e, 0

In [None]:
class Agent:
    def __init__(self, state_size, alpha=1, gamma=0.95, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995, model_name="connect4.hdf5"):
        self.action_history = []
        self.state_size = state_size
        self.action_size = maze_w
        self.reward = 0
        self.memory1 = deque(maxlen=(maze_h*maze_w))
        self.memory2 = deque(maxlen=(maze_h*maze_w))
        self.model_name = model_name
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        try:
            self.model = load_model(model_name)
        except OSError:
            self.model = self.create_model()
    
    def create_model(self):
        model = Sequential()
        model.add(Dense(units=32, input_shape=(self.state_size, ), activation="relu"))
        model.add(Dense(units=8, activation="relu"))
        model.add(Dense(units=self.action_size, activation="linear"))
        model.compile(loss="mse", optimizer=Adam())
        return model
    
    def reset(self):
        self.reward = 0
        self.action_history = []
        self.memory1 = deque(maxlen=(maze_h*maze_w))
        self.memory2 = deque(maxlen=(maze_h*maze_w))
    
    def act(self, state):
        state = np.reshape(state, (1, (maze_w*maze_h)+1))
        if np.random.rand() <= self.epsilon:
            action = random.randrange(self.action_size)
        else:
            actions = self.model.predict(state)
            action = np.argmax(actions[0])
        
        self.action_history.append(action)
        
        return action
    
    def learn(self, state, action, reward, next_state, done):
        state = np.reshape(state, (1, (maze_w*maze_h)+1))
        next_state = np.reshape(next_state, (1, (maze_w*maze_h)+1))
        current_q_value = self.model.predict(state)[0][action]
        if done:
            target = current_q_value + self.alpha*(reward)
        else:
            next_q_value = np.amax(self.model.predict(next_state)[0])
            target = current_q_value + self.alpha*(reward + self.gamma*next_q_value - current_q_value)
        predicted_target = self.model.predict(state)
        predicted_target[0][action] = target
        self.model.fit(state, predicted_target, epochs=10, verbose=0)
    
    def experience_replay(self):
        for state, action, reward, next_state, done in self.memory1:
            state = np.reshape(state, (1, (maze_w*maze_h)+1))
            next_state = np.reshape(next_state, (1, (maze_w*maze_h)+1))
            current_q_value = self.model.predict(state)[0][action]
            if done:
                target = current_q_value + self.alpha*(reward)
            else:
                next_q_value = np.amax(self.model.predict(next_state)[0])
                target = current_q_value + self.alpha*(reward + self.gamma*next_q_value - current_q_value)
            predicted_target = self.model.predict(state)
            predicted_target[0][action] = target
            self.model.fit(state, predicted_target, epochs=10, verbose=0)
        for state, action, reward, next_state, done in self.memory2:
            state = np.reshape(state, (1, (maze_w*maze_h)+1))
            next_state = np.reshape(next_state, (1, (maze_w*maze_h)+1))
            current_q_value = self.model.predict(state)[0][action]
            if done:
                target = current_q_value + self.alpha*(reward)
            else:
                next_q_value = np.amax(self.model.predict(next_state)[0])
                target = current_q_value + self.alpha*(reward + self.gamma*next_q_value - current_q_value)
            predicted_target = self.model.predict(state)
            predicted_target[0][action] = target
            self.model.fit(state, predicted_target, epochs=10, verbose=0)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        

In [None]:
def main(choice):
    board = Board()
    if choice == 2:
        agent = Agent((maze_h*maze_w)+1, epsilon=0.0)
        board.click_allowed = 1
        board.player = 1
        board.playstyle = 2
        episode_count = 1
    elif choice == 3:
        agent = Agent((maze_h*maze_w)+1)
        board.click_allowed = 0
        board.player = 1
    
    def plot_reward_movements():
        fig, axs = plt.subplots(2, 2)
        axs[0, 0].plot(range(1,episode_count+1), moves)
        axs[0, 0].set_title('Moves')
        axs[0, 1].plot(range(1,episode_count+1), moves_avg)
        axs[0, 1].set_title('Average Moves')
        axs[1, 0].plot(range(1,episode_count+1), illegal_moves)
        axs[1, 0].set_title('Illegal Moves')
        axs[1, 1].plot(range(1,episode_count+1), illegal_moves_avg)
        axs[1, 1].set_title('Average Illegal Moves')
        fig.show()
        fig.tight_layout(pad=3.0)
    
    def run_experiment():
        start_time = time.time()
        w1 = 0
        l1 = 0
        w2 = 0
        l2 = 0
        d = 0
        win_type = {1:0, 2:0, 3:0}
        for e in range(1,episode_count+1):
            print("Episode  {0}/{1}".format(e, episode_count))
            agent.reset()
            state = board.reset()
            state = state.flatten()
            done = False
            moves_count = 0
            illegal_moves_count = 0

            while True:
                if ((choice == 3) or ((choice == 2) and (board.player == 2))):
                    moves_count += 1
                    mstate = np.append(state,board.player) #mstate = state with player appended
                    action = agent.act(mstate)
                    nstate, reward, done, wt = board.get_state_reward(board.player, action)
                    nstate = nstate.flatten()
                    mnstate = np.append(nstate,board.player) #mnstate = next state with player appended
                    if board.player == 1:
                        agent.memory1.append((mstate, action, reward, mnstate, done))
                    elif board.player == 2:
                        agent.memory2.append((mstate, action, reward, mnstate, done))
                    agent.learn(mstate, action, reward, mnstate, done)

                    if done == True:
                        if choice == 3:
                            if reward == reward_w:
                                board.canvas.itemconfigure(board.mylabel, text="Player "+str(board.player)+" wins")
                                print("Player "+str(board.player)+" wins")
                                win_type[wt] += 1
                                if board.player == 1: #update loss of other player
                                    agent.memory2.append((np.append(state,2), action, reward_l, np.append(nstate,2), done))
                                    w1 += 1
                                    l2 += 1
                                elif board.player == 2:
                                    agent.memory1.append((np.append(state,1), action, reward_l, np.append(nstate,1), done))
                                    w2 += 1
                                    l1 += 1

                            elif reward == reward_draw:
                                board.canvas.itemconfigure(board.mylabel, text="Draw")
                                print("Draw")
                                d += 1
                                if board.player == 1: #update draw of other player
                                    agent.memory2.append((np.append(state,2), action, reward_draw, np.append(nstate,2), done))
                                elif board.player == 2:
                                    agent.memory1.append((np.append(state,1), action, reward_draw, np.append(nstate,1), done))
                            board.render()
                            print("illegal moves ",illegal_moves_count)
                            print("moves ",moves_count)
                            agent.experience_replay()
                            time.sleep(1)
                        break
                    else:
                        if reward == reward_illegal:
                            illegal_moves_count += 1
                        else: #switch player 
                            if board.player == 1:
                                board.player = 2
                                board.canvas.itemconfigure(board.mylabel, text="Player "+str(board.player)+"\'s turn")
                            else:
                                board.player = 1
                                board.canvas.itemconfigure(board.mylabel, text="Player "+str(board.player)+"\'s turn")
                        if board.playstyle == 2:
                            board.click_allowed = 1
                        board.render()

                    state = nstate
            
            moves.append(moves_count)
            illegal_moves.append(illegal_moves_count)
            if e == 1:
                moves_avg.append(moves_count)
                illegal_moves_avg.append(illegal_moves_count)
            else:
                moves_avg.append(((moves_avg[-1]*len(moves_avg))+moves_count)/(len(moves_avg)+1))
                illegal_moves_avg.append(((illegal_moves_avg[-1]*len(illegal_moves_avg))+illegal_moves_count)/(len(illegal_moves_avg)+1))

        if choice == 3:
            end_time = time.time()
            print("Training time = {0} sec".format(end_time-start_time))
            print("Total Episodes - ",episode_count)
            print("Agent 1")
            print("Wins - {0},Losses - {1},Draws -{2}".format(w1,l1,d))
            print("Agent 2")
            print("Wins - {0},Losses - {1},Draws -{2}".format(w2,l2,d))
            print("Win Types")
            print("Horizontal - ",win_type[1])
            print("Vertical - ",win_type[2])
            print("Diagonal - ",win_type[3])
            save_model(agent.model,'connect4.hdf5')
            plot_reward_movements()
        
    
    board.window.after(10, run_experiment)
    board.window.mainloop()



In [None]:
if __name__=="__main__":
    choice = 0
    while(not(choice>=1 and choice<=3)):
        print("Enter play style")
        print("1) Player vs Player")
        print("2) Player vs AI Agent (Evaluation)")
        print("3) AI Agent vs AI Agent (Training)")
        choice = int(input())
        if(not(choice>=1 and choice<=3)):
            print("Wrong choice entered! Please enter again")
    if choice == 1:
        env = Board()
        env.window.mainloop()
    else:
        main(choice)