In [None]:
import inspect
import os
import sys

from kaggle_environments import make, agent

In [None]:
def write_agent_to_file(function, file):
    with open(file, 'a' if os.path.exists(file) else 'w') as f:
        f.write(inspect.getsource(function))
        print(function, 'written to', file)

In [None]:
def mmfsabp_agent(obs, config):
    
    ######################################
    # Imports, vars and helper functions #
    ######################################
    
    import random
    import numpy as np
    
    N_STEPS = 4
    
    # Gets board at next step if agent drops piece in selected column
    def drop_piece(grid, col, mark, config):
        next_grid = grid.copy()
        
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        
        return next_grid
    
    # Checks if window satisfies heuristic conditions
    def check_window(window, num_discs, piece, config):
        return (
            window.count(piece) == num_discs
            and window.count(0) == config.inarow-num_discs
        )
    
    # Counts number of windows satisfying specified heuristic conditions
    def count_windows(grid, num_discs, piece, config):
        num_windows = 0
        
        # Horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        
        # Vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        
        # Positive Diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow),
                                   range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        
        # Negative Diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1),
                                   range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        
        return num_windows
    
    # Checks if agent or opponent has four in a row in the window
    def is_terminal_window(window, config):
        return (
            window.count(1) == config.inarow
            or window.count(2) == config.inarow
        )
    
    # Checks if game has ended
    def is_terminal_node(grid, config):
        # Check for draw 
        if list(grid[0, :]).count(0) == 0:
            return True
        
        # Check for win: horizontal, vertical or diagonal
        
        # Horizontal 
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if is_terminal_window(window, config):
                    return True
        
        # Vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if is_terminal_window(window, config):
                    return True
        
        # Positive Diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow),
                                   range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        
        # Negative Diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1),
                                   range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        
        return False
    
    # Calculates value of heuristic for grid
    def get_heuristic(grid, mark, config):
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_threes_opp = count_windows(grid, 3, (3-mark), config)
        num_fours_opp = count_windows(grid, 4, (3-mark), config)
        
        return (
            num_threes
            + 1e6*num_fours
            - 1e2*num_threes_opp
            - 1e4*num_fours_opp
        )
    
    # MiniMax with Fail-Soft Alpha-Beta Pruning
    def mmfsabp(node, depth, a, b, maximizingPlayer, mark, config):
        is_terminal = is_terminal_node(node, config)
        valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
        
        if depth == 0 or is_terminal:
            return get_heuristic(node, mark, config)
        
        if maximizingPlayer:
            value = -np.Inf
            
            for col in valid_moves:
                child = drop_piece(node, col, mark, config)
                
                value = max(value, mmfsabp(child, depth-1, a, b, False, mark, config))
                
                a = max(a, value)
                
                # Beta cutoff
                if value >= b:
                    break
            
            return value
        
        else:
            value = np.Inf
            
            for col in valid_moves:
                child = drop_piece(node, col, (3-mark), config)
                
                value = min(value, mmfsabp(child, depth-1, a, b, True, mark, config))
                
                b = min(b, value)
                
                # Alpha cutoff
                if value <= a:
                    break
            
            return value
    
    # Calculates value of dropping piece in selected column
    def score_move(grid, col, mark, config, nsteps):
        next_grid = drop_piece(grid, col, mark, config)
        return mmfsabp(next_grid, nsteps-1, -np.Inf, np.Inf, False, mark, config)
    
    
    #########################
    # Agent makes selection #
    #########################
    
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    
    # Use the heuristic to assign a score to each possible board in the next step
    scores = dict(zip(valid_moves, [
        score_move(grid, col, obs.mark, config, N_STEPS)
        for col in valid_moves
    ]))
    
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [
        key
        for key in scores.keys()
        if scores[key] == max(scores.values())
    ]
    
    # Select at random from the maximizing columns
    return random.choice(max_cols)

In [None]:
write_agent_to_file(mmfsabp_agent, 'submission.py')

In [None]:
out = sys.stdout
submission = agent.read_file('/kaggle/working/submission.py')
agt = agent.get_last_callable(submission)
sys.stdout = out

In [None]:
env = make('connectx', debug=True)
env.run([agt, agt])
print('Success!' if env.state[0].status == env.state[1].status == 'DONE' else 'Failed...')