In [1]:
from kaggle_environments import make, evaluate

# Create the game environment
# Set debug=True to see the errors if your agent refuses to run
env = make("connectx", debug=True)

No pygame installed, ignoring import


In [2]:
def my_agent(obs, config):
    
    ################################
    # Imports and helper functions #
    ################################
    
    import numpy as np
    import random
    from kaggle_environments import make, evaluate
    
    env = make("connectx" , debug=True)
    # Gets board at next step if agent drops piece in selected column
    def drop_piece(grid, col, piece, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = piece
        return next_grid

    # Returns True if dropping piece in column results in game win
    def check_winning_move(obs, config, col, piece):
        # Convert the board to a 2D grid
        grid = np.asarray(obs.board).reshape(config.rows, config.columns)
        next_grid = drop_piece(grid, col, piece, config)
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(next_grid[row,col:col+config.inarow])
                if window.count(piece) == config.inarow:
                    return True
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(next_grid[row:row+config.inarow,col])
                if window.count(piece) == config.inarow:
                    return True
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(next_grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if window.count(piece) == config.inarow:
                    return True
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(next_grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if window.count(piece) == config.inarow:
                    return True
        return False
    
    #########################
    # Agent makes selection #
    #########################
    
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark):
            return col
    return random.choice(valid_moves)

In [3]:
import inspect
import os

f = open("submission.py", "w")
f.write(inspect.getsource(my_agent))
f.close()

print("my_agent", "written to", "submission.py")

my_agent written to submission.py


In [4]:
import sys
from kaggle_environments import utils

out = sys.stdout
submission = utils.read_file("/kaggle/working/submission.py")
agent = "/kaggle/working/submission.py"
sys.stdout = out

env = make("connectx", debug=True)
env.run([agent, agent])
env.render(mode="ipython", width=500, height=450)
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")


Success!


In [5]:
from kaggle_environments import evaluate, make, utils
import numpy as np
import random

# Helper function: gets board at next step if agent drops piece in selected column
def drop_piece(grid, col, mark, config):
    next_grid = grid.copy()
    for row in range(config.rows - 1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = mark
    return next_grid

# Helper function: checks if window satisfies heuristic conditions
def check_window(window, num_discs, piece, config):
    return (window.count(piece) == num_discs and window.count(0) == config.inarow - num_discs)

# Helper function: counts number of windows satisfying specified heuristic conditions
def count_windows(grid, num_discs, piece, config):
    num_windows = 0
    # Horizontal
    for row in range(config.rows):
        for col in range(config.columns - (config.inarow - 1)):
            window = list(grid[row, col:col + config.inarow])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # Vertical
    for row in range(config.rows - (config.inarow - 1)):
        for col in range(config.columns):
            window = list(grid[row:row + config.inarow, col])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # Positive diagonal
    for row in range(config.rows - (config.inarow - 1)):
        for col in range(config.columns - (config.inarow - 1)):
            window = list(grid[range(row, row + config.inarow), range(col, col + config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # Negative diagonal
    for row in range(config.inarow - 1, config.rows):
        for col in range(config.columns - (config.inarow - 1)):
            window = list(grid[range(row, row - config.inarow, -1), range(col, col + config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    return num_windows

# Heuristic evaluation for twos and threes in a row
def get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_twos = count_windows(grid, 2, mark, config)
    print(f"Threes: {num_threes}, Twos: {num_twos}")
    return num_threes, num_twos

# Agent for running the game and counting twos and threes
def custom_agent(obs, config):
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    print(f"Grid \n", grid)
    # Count twos and threes in a row
    num_threes, num_twos = get_heuristic(grid, obs.mark, config)
    
    # Example of choosing a random valid move
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    return random.choice(valid_moves)

# Play and display the game
env = make("connectx", debug=True)
env.render()

# Run the game with your custom agent
env.run([custom_agent, "random"])

# Render the game (optional)
env.render(mode="ipython", width=500, height=450)


Grid 
 [[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]]
Threes: 0, Twos: 0
Grid 
 [[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 1 0 2 0 0]]
Threes: 0, Twos: 0
Grid 
 [[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 1 1 2 2 0 0]]
Threes: 0, Twos: 0
Grid 
 [[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 2 0 1 0 0 0]
 [0 1 1 2 2 0 0]]
Threes: 0, Twos: 1
Grid 
 [[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 2 0 1 0 0 2]
 [0 1 1 2 2 0 1]]
Threes: 0, Twos: 1
Grid 
 [[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 1 0 0 0]
 [0 2 2 1 0 0 2]
 [0 1 1 2 2 0 1]]
Threes: 0, Twos: 2
Grid 
 [[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 1 0 0 1]
 [0 2 2 1 0 0 2]
 [2 1 1 2 2 0 1]]
Threes: 0, Twos: 3
Grid 
 [[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 1]
 [0 0 0 1 0 0 1]
 [2 2 2 1 0 0 2]
 [2 1

In [6]:
#COEN 4850 Project 1
#One-Step and N-Step lookahead agents
#All of this is code from the tutorials on D2L
from kaggle_environments import evaluate, make, utils
import numpy as np
import random
import time

#Helper Functions for N-Step lookahead

# Helper function for score_move: gets board at next step if agent drops piece in selected column
def drop_piece(grid, col, mark, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = mark
    return next_grid

# Helper function for get_heuristic: checks if window satisfies heuristic conditions
def check_window(window, num_discs, piece, config):
    return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
    
# Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
def count_windows(grid, num_discs, piece, config):
    num_windows = 0
    # horizontal
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    return num_windows

# Helper function for minimax: checks if agent or opponent has four in a row in the window
def is_terminal_window(window, config):
    return window.count(1) == config.inarow or window.count(2) == config.inarow

# Helper function for minimax: checks if game has ended
def is_terminal_node(grid, config):
    # Check for draw 
    if list(grid[0, :]).count(0) == 0:
        return True
    # Check for win: horizontal, vertical, or diagonal
    # horizontal 
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if is_terminal_window(window, config):
                return True
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if is_terminal_window(window, config):
                return True
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    return False

# Minimax implementation
def minimax(node, depth, maximizingPlayer, mark, config):
    is_terminal = is_terminal_node(node, config)
    valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
    if depth == 0 or is_terminal:
        return NS_Get_heuristic(node, mark, config)
    if maximizingPlayer: #max's turn
        value = -np.inf
        for col in valid_moves:
            child = drop_piece(node, col, mark, config)
            value = max(value, minimax(child, depth-1, False, mark, config))
        return value
    else: #min's turn
        value = np.inf
        for col in valid_moves:
            child = drop_piece(node, col, mark%2+1, config)
            value = min(value, minimax(child, depth-1, True, mark, config))
        return value

#Get Scores for each possible move using minimax
def NS_score_move(grid, col, mark, config, nsteps):
    next_grid = drop_piece(grid, col, mark, config)
    score = minimax(next_grid, nsteps-1, False, mark, config)
    return score

#This is the heuristic used for N-Step
def NS_Get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    num_fours_opp = count_windows(grid, 4, mark%2+1, config)
    avg_mid = getAvgMiddle(grid, mark)
    score = 1e1*num_threes - 1e3*num_threes_opp - 1e5*num_fours_opp + 1e7*num_fours + avg_mid
    return score

def getAvgMiddle(grid, mark):
    yPos = []
    for i in range(len(grid)):
        for j in range(len(grid[i])):
            if(grid[i][j] == mark):
                yPos.append(i)
    sum = 0
    for i in range(len(yPos)):
        sum += yPos[i]
    avg = (sum / len(yPos) - 3-.1) 
    return 1/avg

N_STEPS = 3 #Change this value to change how many steps ahead the agent looks
#Agent for N-Step lookahead
#This will also keep track of how lng it takes to make a move
def NStepAgent(obs, config):
    StartTime=time.time()#Start the timer
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next step
    scores = dict(zip(valid_moves, [NS_score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    EndTime=time.time()#End the timer
    #Display turn length and check if it exceeds 2 seconds
    print("Time = ",EndTime-StartTime)
    if(EndTime-StartTime >2):
        print("Too long")
    return random.choice(max_cols)

#Play and display the game
env = make("connectx", debug=True)
env.render()
env.run([NStepAgent, NStepAgent])
env.render(mode="ipython", width=500, height=450)
#This will show two agents with N-Step lookahead play against each other

Time =  0.9678676128387451
Time =  0.966184139251709
Time =  0.949040412902832
Time =  0.9431524276733398
Time =  0.9733479022979736
Time =  0.8432571887969971
Time =  0.9480147361755371
Time =  0.9559733867645264
Time =  0.9574592113494873
Time =  0.8988611698150635
Time =  0.5983197689056396
Time =  0.6038632392883301
Time =  0.5551395416259766
Time =  0.5675618648529053
Time =  0.5616850852966309
Time =  0.34706616401672363
Time =  0.3514389991760254
Time =  0.3480687141418457
Time =  0.30233240127563477
Time =  0.35053586959838867
Time =  0.3344094753265381
Time =  0.36550354957580566
Time =  0.29773449897766113
Time =  0.306652307510376
Time =  0.2931406497955322
Time =  0.35735416412353516
Time =  0.32862257957458496
Time =  0.3623635768890381
Time =  0.2888050079345703
Time =  0.2728879451751709
Time =  0.2921595573425293
Time =  0.17454147338867188
Time =  0.14350128173828125
Time =  0.1405775547027588
Time =  0.09610700607299805
Time =  0.07465481758117676


In [7]:

#COEN 4850 Project 1
#One-Step and N-Step lookahead agents
#All of this is code from the tutorials on D2L
from kaggle_environments import evaluate, make, utils
import numpy as np
import random
import time

#Helper Functions for N-Step lookahead

# Helper function for score_move: gets board at next step if agent drops piece in selected column
def drop_piece(grid, col, mark, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = mark
    return next_grid

# Helper function for get_heuristic: checks if window satisfies heuristic conditions
def check_window(window, num_discs, piece, config):
    return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
    
# Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
def count_windows(grid, num_discs, piece, config):
    num_windows = 0
    # horizontal
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    return num_windows

# Helper function for minimax: checks if agent or opponent has four in a row in the window
def is_terminal_window(window, config):
    return window.count(1) == config.inarow or window.count(2) == config.inarow

# Helper function for minimax: checks if game has ended
def is_terminal_node(grid, config):
    # Check for draw 
    if list(grid[0, :]).count(0) == 0:
        return True
    # Check for win: horizontal, vertical, or diagonal
    # horizontal 
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if is_terminal_window(window, config):
                return True
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if is_terminal_window(window, config):
                return True
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    return False

# Minimax implementation
def minimax(node, depth, maximizingPlayer, mark, config):
    is_terminal = is_terminal_node(node, config)
    valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
    if depth == 0 or is_terminal:
        return NS_Get_heuristic(node, mark, config)
    if maximizingPlayer: #max's turn
        value = -np.inf
        for col in valid_moves:
            child = drop_piece(node, col, mark, config)
            value = max(value, minimax(child, depth-1, False, mark, config))
        return value
    else: #min's turn
        value = np.inf
        for col in valid_moves:
            child = drop_piece(node, col, mark%2+1, config)
            value = min(value, minimax(child, depth-1, True, mark, config))
        return value

#Get Scores for each possible move using minimax
def NS_score_move(grid, col, mark, config, nsteps):
    next_grid = drop_piece(grid, col, mark, config)
    score = minimax(next_grid, nsteps-1, False, mark, config)
    return score

#This is the heuristic used for N-Step
def NS_Get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    num_fours_opp = count_windows(grid, 4, mark%2+1, config)
    avg_mid = getAvgMiddle(grid, mark)
    score = 1e1*num_threes - 1e3*num_threes_opp - 1e5*num_fours_opp + 1e7*num_fours + avg_mid
    return score

def getAvgMiddle(grid, mark):
    yPos = []
    for i in range(len(grid)):
        for j in range(len(grid[i])):
            if(grid[i][j] == mark):
                yPos.append(j)
    sum = 0
    for i in range(len(yPos)):
        sum += yPos[i]
    avg = 1/((sum / len(yPos) - 3) +.1)
    return avg

N_STEPS = 3 #Change this value to change how many steps ahead the agent looks
#Agent for N-Step lookahead
#This will also keep track of how lng it takes to make a move
def NStepAgent(obs, config):
    import numpy as np
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next step
    scores = dict(zip(valid_moves, [NS_score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    EndTime=time.time()#End the timer
    #Display turn length and check if it exceeds 2 seconds
    return random.choice(max_cols)



In [8]:
import inspect
import os

f = open("submission.py", "w")
f.write(inspect.getsource(NStepAgent))
f.close()

print("NStepAgent", "written to", "submission.py")

NStepAgent written to submission.py


In [9]:
from kaggle_environments import make, evaluate
import sys
from kaggle_environments import utils
from kaggle_environments import agent
import numpy as np

out = sys.stdout
submission = utils.read_file("/kaggle/working/submission.py")
agent = "/kaggle/working/submission.py"
sys.stdout = out

env = make("connectx", debug=True)
env.run([agent, agent])
env.render(mode="ipython", width=500, height=450)
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/kaggle_environments/agent.py", line 159, in act
    action = self.agent(*args)
  File "/opt/conda/lib/python3.10/site-packages/kaggle_environments/agent.py", line 130, in callable_agent
    agent(*args) \
  File "/kaggle/working/submission.py", line 8, in NStepAgent
    scores = dict(zip(valid_moves, [NS_score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves]))
  File "/kaggle/working/submission.py", line 8, in <listcomp>
    scores = dict(zip(valid_moves, [NS_score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves]))
NameError: name 'NS_score_move' is not defined
Error: ['Traceback (most recent call last):\n', '  File "/opt/conda/lib/python3.10/site-packages/kaggle_environments/agent.py", line 159, in act\n    action = self.agent(*args)\n', '  File "/opt/conda/lib/python3.10/site-packages/kaggle_environments/agent.py", line 130, in callable_agent\n    agent(*args) \\\n',

Failed...
