In [2]:
import abc
import chess
import snake

###################################################################
# Developed by Nicola Rossi (nicolaxrossi) on 19/10/2023
# last modified: 20/10/2023
###################################################################

############################ INTERFACE ############################
class GameInterface(metaclass=abc.ABCMeta):
    @classmethod
    def __subclasshook__(cls, subclass):
        return (hasattr(subclass, 'get_initial_configuration') and 
                callable(subclass.get_initial_configuration) and 
                hasattr(subclass, 'is_final_config') and 
                callable(subclass.is_final_config) or 
                hasattr(subclass, 'get_legal_moves') and 
                callable(subclass.get_legal_moves) or 
                
                hasattr(subclass, 'apply_move') and 
                callable(subclass.apply_move) or 
                hasattr(subclass, 'simulate_move') and 
                callable(subclass.simulate_move) or 
                NotImplemented)
    
    @abc.abstractmethod
    def get_initial_configuration(self):
        """return an initial configuration"""
        raise NotImplementedError

    @abc.abstractmethod
    def is_final_config(self, configuration):
        """check whether the configuration is final or not"""
        raise NotImplementedError

    @abc.abstractmethod
    def get_legal_moves(self, configuration):
        """returns a list containing the set of legal moves, from the given configuration"""
        raise NotImplementedError
        
    @abc.abstractmethod
    def apply_move(self, configuration, move):
        """applies the moves to the configuration"""
        raise NotImplementedError
        
    @abc.abstractmethod
    def simulate_move(self, configuration, move):
        """ simulates the move on the given configuration: makes a copy of the given config., applies the move on the
            copy and returns the copy """
        raise NotImplementedError
    
    @abc.abstractmethod
    def outcome(self, configuration):
        """returns information on the outcome"""
        raise NotImplementedError
        
    @abc.abstractmethod
    def update(self, configuration):
        raise NotImplementedError
        
    @abc.abstractmethod
    def is_failure(self, configuration):
        raise NotImplementedError

############################ CHESS GAME MODEL ############################
class ChessGame(GameInterface):
    
    def get_initial_configuration(self):
        """return an initial configuration"""
        return chess.Board()
    
    def is_final_config(self, configuration:chess.Board):
        """check whether the configuration is final or not
            se configuration.board is None, then the game is not concluded """
        return configuration.outcome() is not None

    def get_legal_moves(self, configuration:chess.Board):
        """returns a list containing the set of legal moves, from the given configuration"""
        # NOTA: list(configuration.legal_moves) ritorna una lista di mosse del tipo Move.from_uci('g1h3')
        return list(configuration.legal_moves)
        
    def apply_move(self, configuration:chess.Board, move:chess.Move):
        """applies the moves to the configuration"""
        configuration.push(move)
    
    def simulate_move(self, configuration:chess.Board, move:chess.Move):
        """simulates the move on the given configuration: makes a copy of the given config., applies the move on the
            copy and returns the copy"""
        copy = configuration.copy()
        copy.push(move)
        return copy
    
    def outcome(self, configuration):
        """returns information on the outcome"""
        outcome_info = {}
        outcome_info['Winner'] = configuration.outcome().winner
        outcome_info['Termination'] = configuration.outcome().termination
        return outcome_info
    
    def update(self, configuration):
        pass
    
    def is_failure(self, configuration):
        pass
    

############################ SNAKE GAME MODEL ############################
class SnakeGame(GameInterface):
    
    def get_initial_configuration(self):
        """return an initial configuration"""
        return snake.Board(10, 10)
    
    def is_final_config(self, configuration:snake.Board):
        """check whether the configuration is final or not, that is: if the apple has been eaten or not
            True -> snake has eaten the apple """
        return configuration.eaten()

    def get_legal_moves(self, configuration:snake.Board):
        """returns a list containing the set of legal moves, from the given configuration"""
        # NOTE: the 'legal_moves' method returns a list of moves (it's not necessary to convert the call result
        # to list type)
        return configuration.legal_moves()
        
    def apply_move(self, configuration:snake.Board, move:str):
        """applies the moves to the configuration"""
        configuration.apply_move(move)

        # check if the snake has eaten an apple. If so, snake must grow in size
        if configuration.eaten():
            configuration.grow_snake()
            #configuration.spawn_apple()
    
    def simulate_move(self, configuration:snake.Board, move:str):
        """simulates the move on the given configuration: makes a copy of the given config., applies the move on the
            copy and returns the copy"""
        
        copy = configuration.copy()
        copy.apply_move(move)
        
        # check if the snake has eaten an apple. If so, snake must grow in size
        if copy.eaten():
            copy.grow_snake()
            #copy.spawn_apple()

        return copy
    
    def update(self, configuration:snake.Board):
        if configuration.eaten():
            configuration.spawn_apple()
            
    
    def outcome(self, configuration:snake.Board):
        """returns information on the outcome"""
        outcome_info = {}
        outcome_info['Score'] = configuration.score
        outcome_info['# Moves'] = configuration.num_moves
        outcome_info['Termination'] = configuration.outcome
        return outcome_info

    def is_failure(self, configuration:snake.Board):
        if configuration.snake_collision():
            return True
        if configuration.wall_collision():
            return True
        
        

In [3]:
class State:
    
    """ class defining the concept of state: it contains a representation of the environment, in our case
        a possible game configuration (for some game) and a reference to the game, used to compute possible
        actions from the state, compute the application of an action to the state and check whether the
        state is final or not """
    
    representation = None
    game = None
    
    def __init__(self, g:GameInterface, rep):
        self.representation = rep
        self.game = g
        
    def __eq__(self, other):
        if isinstance(other, State):
            return self.representation == other.representation
        else:
            return False
        
    def __hash__(self):
        # it is required that representation objects have in thei class an implementation of __hash__ method
        return hash(self.representation)
    
    def __str__(self):
        return f"game: {self.game}, representation: \n{self.representation}"
    
    def actions(self):
        return self.game.get_legal_moves(self.representation)
    
    def apply(self, action):
        return self.__class__(self.game, self.game.simulate_move(self.representation, action))
    
    def is_final(self):
        return self.game.is_final_config(self.representation)
    
    def is_failure(self):
        return self.game.is_failure(self.representation)

In [4]:
class Node:
        
        """ class defined to manage heuristic (and non) search algorithms. It is used because of its convenience
            in storing a state and a value for the path-cost function and for a reference to parent node.
            In particular, a reference to parent node is very useful to reconstruct (backwards) the sequence
            of actions leading from a initial state to a final one (notice action field). """
        
        state = None
        action = None
        parent = None
        path_cost = None
        
        """ in a tree, the root node has parent = None """
        
        def __init__(self, s:State, a, p, pc):
            self.state = s
            self.action = a
            self.parent = p
            self.path_cost = pc
            
        def __eq__(self, other):
            if isinstance(other, Node):
                return self.state == other.state
            else:
                return False
        
        def __lt__(self, other):
            # no exact notion of order among nodes
            return False
            
        def __hash__(self):
            return hash((self.state, self.action, self.parent, self.path_cost))
        
        def __str__(self):
            return f"SELF: {self.state}, {self.action}, PARENT: {self.parent}, {self.path_cost}"

In [5]:
class PathCosts:
    
    """ a simple class-collection of path-costs functions (valuating nodes) """
    
    @staticmethod
    def uniform_cost(n:Node):
        """ if node n' is a successor of node n, then g(n') = g(n) + 1 """
        return n.path_cost + 1

In [51]:
import networkx as nx
import copy

class Heuristics:
    
    """ class-collection of heuristics
        Notice that heuristics, in order to assign a value to a state must interface directly with the 
        module giving methods/representation for the game. So in this class there must be imported
        all the game modules one wants to use! """
    
    @staticmethod
    def snake_h1(s:State):
        
        # retrieve snake head and apple positions
        head_pos = s.representation.snake[0]
        apple_pos = s.representation.apple
        
        # compute Manhattan distance between snake head (P1) and apple (P2), and return it
        return abs(apple_pos[0] - head_pos[0]) + abs(apple_pos[1] - head_pos[1])
    
    @staticmethod
    def snake_h2(s:State):
        
        # retrieve snake head and apple positions
        head_pos = s.representation.snake[0]
        apple_pos = s.representation.apple
        
        if s.representation.snake_collision():
            return float('inf')
        
        if s.representation.wall_collision():
            return float('inf')
        
        # compute Manhattan distance between snake head (P1) and apple (P2), and return it
        return abs(apple_pos[0] - head_pos[0]) + abs(apple_pos[1] - head_pos[1])
        
                
    @staticmethod
    def snake_h3(s:State):
        
        # IDEA: dato che il serpente cambia direzione random, lasciando alcuni buchi in cui può spawnare la mela
        # che portano poi a mangiarsi da solo, si prova a incentivare il rimanere nella stessa direzione, dando una 
        # penalità quando il serpente cambia direzione
        
        # retrieve snake head and apple positions
        head_pos = s.representation.snake[0]
        apple_pos = s.representation.apple
        
        if s.representation.snake_collision():
            return float('inf')
        
        if s.representation.wall_collision():
            return float('inf')
        
        # if the apple is at distance >= 40% of the maximum distance (about 7) check the snake length
        #    if the snake length is 'small', go directly to the apple
        #    otherwise, try to make space
        # if, instead, the snake is closer to the apple, focus on the apple
        # the general idea is: if space is created when the apple is distant, while maybe approaching the apple
        # more space will available to manouvre
        
        print('length: ', len(s.representation.snake))
        print('distance: ', abs(apple_pos[0] - head_pos[0]) + abs(apple_pos[1] - head_pos[1]))
        
        if len(s.representation.snake) <= (0.10 * (s.representation.x_limit * s.representation.y_limit)):
            return (abs(apple_pos[0] - head_pos[0]) + abs(apple_pos[1] - head_pos[1]))

        else:
            
            d = (abs(apple_pos[0] - head_pos[0]) + abs(apple_pos[1] - head_pos[1]))

            k = 0.2

            # Normalize the distance to the range [0, 1] for a 10x10 board
            normalized_distance = d / (s.representation.y_limit +  s.representation.x_limit - 2)

            # Invert the normalized distance so that shorter distances have higher scores
            inverted_normalized_distance = 1 - normalized_distance
            
            # before adding the random component, first consider if the direction has changed: if so, 
            # add a malus equal to inverted_normalized_distance to the score

            score = inverted_normalized_distance * k
            
            pred = s.representation.snake[1]
            prev_dir = (head_pos[0] - pred[0], head_pos[1] - pred[1])
            
            if prev_dir != s.representation.snake_direction:
                score += (inverted_normalized_distance * k)

            # Introduce randomness
            mean = score #score*10  # Mean is your calculated score
            standard_deviation = 1.0  # Adjust this to control the level of randomness
            random_element = abs(random.normalvariate(mean, standard_deviation))

        
            print('score randomness: ', score + random_element)
            # malus se può seguire la sua coda ma non lo fa
            tail_pos = s.representation.snake[len(s.representation.snake)-1]
            t = (abs(tail_pos[0] - head_pos[0]) + abs(tail_pos[1] - head_pos[1]))
            ht_dis = 1 -(t / (s.representation.y_limit +  s.representation.x_limit - 2))
            
            score += ht_dis

            #else:
            x_limit = s.representation.x_limit
            y_limit = s.representation.y_limit
            # incentivare a restare attaccati al muro, quando possibile
            #if head_pos[0] == 0 or head_pos[0] == x_limit-1 or head_pos[1] == 0 or head_pos[1] == y_limit-1:
            #    return score + random_element
            
            return score + random_element #+ 2 # + normalized_ht_dis
            
    
    ############################################## SCACCHI ##############################################################
    
    @staticmethod
    def piece_value(piece:chess.Piece):
        
        if piece.piece_type == chess.PAWN:
            return 1
        
        if piece.piece_type == chess.KNIGHT:
            return 3
        
        if piece.piece_type == chess.BISHOP:
            return 3
        
        if piece.piece_type == chess.ROOK:
            return 5
        
        if piece.piece_type == chess.QUEEN:
            return 9
        
        if piece.piece_type == chess.KING:
            return 0
    
    @staticmethod
    def evaluate_king_safety(board):
        king_square = board.king(board.turn)

        # Evaluate pawn shelter in front of the king
        #pawn_shelter_score = Heuristics.evaluate_pawn_shelter(board, king_square)

        # Evaluate open files near the king
        open_file_score = Heuristics.evaluate_open_files(board, king_square)

        # Evaluate the presence of attacking pieces near the king
        piece_attack_score = Heuristics.evaluate_piece_attack(board, king_square)

        # Evaluate the king's ability to castle
        castling_score = Heuristics.evaluate_castling(board)

        # Combine the scores based on your preferences
        king_safety_score = (open_file_score +
                            piece_attack_score + castling_score)
    
        return king_safety_score
    
    @staticmethod
    def evaluate_pawn_shelter(board, king_square):
        # Evaluate the presence of pawns in front of the king
        pawn_shelter = 0
        for square in chess.SQUARES:
            if king_square < square < king_square + 8:
                piece = board.piece_at(square)
                if piece and piece.color == board.turn and piece.piece_type == chess.PAWN:
                    pawn_shelter += 0.1
        return pawn_shelter
    
    @staticmethod
    def evaluate_open_files(board, king_square):
        # Evaluate the presence of open files near the king
        open_file = 0
        file = chess.square_file(king_square)
        rank = chess.square_rank(king_square)

        for offset in [-1, 1]:

            adjacent_file = file + offset
            adjacent_rank = rank

            if 0 <= adjacent_file <= 7:

                is_open = True

                for i in range(0, 8):

                    if adjacent_rank == 7:
                        adjacent_rank = 0

                    adjacent_square = chess.square(adjacent_file, adjacent_rank)

                    if (board.piece_at(adjacent_square) is not None) and (board.piece_at(adjacent_square).color == board.turn):
                        is_open = False
                        break

                    adjacent_rank += 1

                if is_open:
                    open_file += 0.2

        return -open_file
    
    @staticmethod
    def evaluate_piece_attack(board, king_square):
        
        # Evaluate the presence of attacking pieces near the king
        piece_attack = 0

        for square in chess.SQUARES:

            piece = board.piece_at(square)

            if piece and piece.color != board.turn:

                if king_square in board.attacks(square):
                    piece_attack += 0.3

        return -piece_attack

    @staticmethod
    def evaluate_castling(board):
        # Evaluate the king's ability to castle
        if board.turn == chess.WHITE:
            if board.has_kingside_castling_rights(board.turn) and board.king(board.turn) == chess.E1:
                return 0.2
        else:
            if board.has_kingside_castling_rights(board.turn) and board.king(board.turn) == chess.E8:
                return 0.2

        return 0

    
    @staticmethod
    def chess_h1(s:State):
        """ euristica basata sul material score, per funzionare richiede la libreria 'chess' """  
        
        board = s.representation
        
        # controllo che lo stato non sia contenga configurazione finale: caso in cui restituisco un valore
        # 'grande' per spingere l'algoritmo a scegliere la mossa che termina la partita
        if board.is_checkmate():
            return 1000000
        
        white_score = 0
        black_score = 0
        
        for i in range(0, 64):
            piece = board.piece_at(chess.parse_square(chess.SQUARE_NAMES[i]))
            
            # se in uno square non si trova alcuna pedina, viene ritornato None
            if piece is not None:
                if piece.color == chess.WHITE:
                    white_score += Heuristics.piece_value(piece)
                else:
                    black_score += Heuristics.piece_value(piece)
        
        score = white_score - black_score
        
        if board.turn == chess.WHITE:
            return score + Heuristics.evaluate_king_safety(board)
        else:
            return -score + Heuristics.evaluate_king_safety(board)
        
    @staticmethod
    def has_doubled_pawns(board, square):
        """ Check if there are doubled pawns on the same file at the given square. 
            Notice that it's assumed the piece retrieved is a pawn! """

        piece = board.piece_at(square)

        color = board.turn

        file = chess.square_file(square)
        rank = chess.square_rank(square)
        adjacent_rank = rank

        for i in range(0, 8):

            if adjacent_rank == 7:
                adjacent_rank = 0

            if adjacent_rank == rank:
                adjacent_rank += 1
                continue

            adjacent_square = chess.square(file, adjacent_rank)

            if (board.piece_at(adjacent_square) is not None) and \
                (board.piece_at(adjacent_square).color == color) and \
                (board.piece_at(adjacent_square).piece_type == chess.PAWN):
                return True

            adjacent_rank += 1

        return False
    
    @staticmethod
    def is_isolated_pawn(board, square):
        """ Check if a pawn on the given square is isolated (no friendly pawns on ADJACENT files, not the
            file where the pawn is actually in)."""
        piece = board.piece_at(square)
        if piece is not None and piece.piece_type == chess.PAWN:
            file = chess.square_file(square)
            rank = chess.square_rank(square)
            color = board.turn

            # Check the left and right adjacent files for friendly pawns
            for offset in [-1, 1]:

                adjacent_file = file + offset
                print(adjacent_file)
                adjacent_rank = rank

                if 0 <= adjacent_file <= 7:

                    for i in range(0, 8):

                        if adjacent_rank == 7:
                            adjacent_rank = 0

                        adjacent_square = chess.square(adjacent_file, adjacent_rank)

                        adjacent_rank += 1

                        if (board.piece_at(adjacent_square) is not None) and \
                            (board.piece_at(adjacent_square).color == color) and \
                            (board.piece_at(adjacent_square).piece_type == chess.PAWN):
                            return False

        # If no adjacent friendly pawns are found, it's an isolated pawn
        return True
    
    @staticmethod
    def chess_h2(s:State):   
        """ This function returns a combination of the
        
           - material score
           
           - pawn structure: isolated and doubled pawns
               
           - center control 
               a little 'reward' is given if the player makes a move to control the center """
    
        board = s.representation
        
        score = 0

        score += Heuristics.chess_h1(s)
        
        # if score == 1000000 then a final state has been found => return
        if score == 1000000:
            return score

        # Pawn structure (simple evaluation of doubled pawns and isolated pawns)
        for square in chess.SQUARES:
            piece = board.piece_at(square)
            if piece is not None:
                if piece == chess.PAWN:
                    if Heuristics.is_doubled_pawn(board, square):
                        score -= 0.5
                    if Heuristics.is_isolated_pawn(board, square):
                        score -= 0.5


        # Center control
        center_squares = [chess.D4, chess.E4, chess.D5, chess.E5]
        for square in center_squares:
            if board.piece_at(square) is not None:
                if board.piece_at(square).color == board.turn:
                    score += 0.2

        return score


In [52]:
import random
import heapq

class Algorithms:
    
    """ class containing the implementation of different search algorithms:
        - A* search, for single-agent worlds
        - minimax (with limited deep) 
        - alpha-beta pruning minimax 
        - class Node is implemented to manage A*, deeper information on report """
    
    ################################################### A* ##########################################################
    
    @staticmethod
    def insert(priority_queue, val, count, node:Node):
        """ convenience function to insert elements in the priority queue.
            Note: heapq implements min-heaps! 
            count is used to ensure that equal-valued tuples are mantained in insertion order """
        heapq.heappush(priority_queue, (val, count, node))
    
    @staticmethod
    def pop(priority_queue):
        priority, count, node = heapq.heappop(priority_queue)
        return priority, node
    
    @staticmethod
    def is_empty(priority_queue):
        return len(priority_queue) == 0
    
    @staticmethod
    def compute_solution(node:Node):
        """ function used to compute the sequence of actions leading to a final state.
            In particular, the sequence of actions is reconstructed backwards, starting from the input
            node, following the pointer to the father, until the root is found (the root has None parent pointer).
            For each visited node, the action is memorized in a list, which is returned.
            Notice that in this way actions are returned in the reverse order in which they have to be
            applied, so each new action is not appended at the end of the list, but is inserted as the first element.
            In this way, the last 'seen' is the first in the list, as it should be. """
        
        action_sequence = []
        
        action_sequence.insert(0, node.action)
        
        node = node.parent
        
        while node.parent is not None:
            
            action_sequence.insert(0, node.action)
            node = node.parent
        
        return action_sequence
            
    
    @staticmethod
    def a_star(s:State, g:PathCosts, h:Heuristics):
        
        count = 0
        """ This A* implementation, given an initial state, SOLVES the problem. That is, the algorithm returns
            a sequence of actions to be applied consecutively, state after state, in order to reach a final state,
            that is the goal (ie: solve the problem).
            In general , it can happen that a problem does not have a solution/the agent is not able to find one 
            so the algorithm for such cases returns None, indicating a failure
            The algorithm also returns the number of explored states """
        
        # computing h valuation and path cost for the initial state, initializing the first node, the queue and
        # the explored set. Inserting the first node to the queue
        h_val = h(s)
        g_val = 0
        f_val = h_val + g_val
        
        node = Node(s, None, None, g_val)
        frontier = []
        
        Algorithms.insert(frontier, f_val, count, node)
        count += 1
        
        explored = []
        
        while not Algorithms.is_empty(frontier):
            
            # recall that the first element of the tuple is the node valuation, the second is the node itself
            node = Algorithms.pop(frontier)[1]
            
            #if node.state.is_failure():
            #    continue
            
            if node.state.is_final():
                return Algorithms.compute_solution(node), len(explored)
            
            explored.append(node)
            
            for action in node.state.actions():
                
                # the state obtained applying action to s is computed
                new_state = node.state.apply(action)
                
                # the new state is evaulated under h
                h_val = h(new_state)
                # the path cost for the new node is evaluated under g
                g_val = g(node)
                
                f_val = h_val + g_val
                
                child = Node(new_state, action, node, g_val)
                
                # the new generated child can be already in the frontier, but with a different valuation:
                # in order to guarantee optimality, the algorithm must check whether the current child
                # is already in the frontier, and if so, if it now has a better valuation.
                # If yes, then we need to insert the child with the current valuation and remove the child
                # with the previous valuation from the frontier.
                # If no, then we check if the child has been already visited, that is: if it is in the 'explored'
                # set. If so, we simply don't insert it in the frontier.
                
                index_to_remove = -1
                for i in range(0, len(frontier)):
                    # nodes are stored in the second component of the tuple
                    if frontier[i][1] == child:

                        # nodes valuations are stored in the first component of the tuple
                        # notice the - sign, recall that heapq implements only min-heaps, so
                        # we need to make the value positive again before comparing it with f_val!
                        if frontier[i][0] > f_val:
                            index_to_remove = i

                # if the child node is already in the frontier, with a worst valuation then the current one (f_val)
                # then the tuple (chil, old_val) is removed from the queue and (child, f_val) is inserted
                if index_to_remove > -1:
                    frontier.pop(index_to_remove)
                    Algorithms.insert(frontier, f_val, count, child)
                    count += 1
                    
                
                # finally, if child is not explored, it's added to the queue
                if child not in explored:
                    Algorithms.insert(frontier, f_val, count, child)
                    count += 1
            
        
        # if the algorithm has not returned anything until this point, then the whole tree
        # has been explored and no solution has been found, a FAILURE must be returned, (None)
        return None, len(explored)
    
    ################################################# GREEDY ########################################################
    @staticmethod
    def greedy(s:State, g:PathCosts, h:Heuristics):
        
        count = 0
        """ This Greedy implementation, given an initial state, SOLVES the problem. That is, the algorithm returns
            a sequence of actions to be applied consecutively, state after state, in order to reach a final state,
            that is the goal (ie: solve the problem).
            In general , it can happen that a problem does not have a solution/the agent is not able to find one 
            so the algorithm for such cases returns None, indicating a failure
            The algorithm also returns the number of explored states """
        
        # computing h valuation and path cost for the initial state, initializing the first node, the queue and
        # the explored set. Inserting the first node to the queue
        h_val = h(s)
        
        node = Node(s, None, None, 0)
        frontier = []
        
        Algorithms.insert(frontier, h_val, count, node)
        count += 1
        
        explored = []
        
        while not Algorithms.is_empty(frontier):
            
            # recall that the first element of the tuple is the node valuation, the second is the node itself
            node = Algorithms.pop(frontier)[1]
            
            explored.append(node)
            
            for action in node.state.actions():
                
                # the state obtained applying action to s is computed
                new_state = node.state.apply(action)
                
                # the new state is evaulated under h
                h_val = h(new_state)
                
                child = Node(new_state, action, node, 0)
                
                # when a state is firstly generated and is final, it's immediately returned
                if new_state.is_final():
                    return Algorithms.compute_solution(child), len(explored)
                    
                # if child is not explored, it's added to the queue
                if child not in explored:
                    Algorithms.insert(frontier, h_val, count, child)
                    count += 1
            
        
        # if the algorithm has not returned anything until this point, then the whole tree
        # has been explored and no solution has been found, a FAILURE must be returned, (None)
        return None, len(explored)
        
    
    ################################################# MINIMAX #######################################################
    
    @staticmethod
    def minimax_search(s:State, l:int, h:Heuristics):
        """ This minimax version is developed to be used in a real-time setting game.
            l represents the number of levels to explore during the search """
        
        def min_value(s:State, l:int, h:Heuristics, explored):
            
            explored += 1
        
            # base cases
            if s.is_final():
                return h(s), explored
            if l == 0:
                return h(s), explored

            # other case
            v = float('inf')
            for action in s.actions():
                res, explored = max_value(s.apply(action), l-1, h, explored)
                v = min(v, res)
                #explored += newly_explored

            return v, explored
    
        def max_value(s:State, l:int, h:Heuristics, explored):
            
            explored += 1

            # base cases
            if s.is_final():
                return h(s), explored
            if l == 0:
                return h(s), explored

            # other case
            v = float('-inf')
            for action in s.actions():
                res, explored = min_value(s.apply(action), l-1, h, explored)
                v = max(v, res)

            return v, explored
        
        valuated_actions = []
        
        explored = 0
        
        for action in s.actions():
            valuation, explored = min_value(s.apply(action), l, h, explored)
            #explored += newly_explored
            valuated_actions.append((action, valuation))
        
        sorted_actions = sorted(valuated_actions, key=lambda x: x[1])
        
        # if there are two or more actions with the same valuation, the choice is randomized
        
        # the best action (those with the maximum valuation) is selected
        best_valuation = sorted_actions[len(sorted_actions)-1][1]
        
        to_randomize = []
        
        # building up the list of action from which to choose
        for tup in sorted_actions:
            if tup[1] == best_valuation:
                to_randomize.append(tup)
                
        # if there is only one action the maximum valuation, then it's returned
        if len(to_randomize) == 1:
            return to_randomize[0][0], explored
        
        # otherwise, the choice is randomized on the list previously created (contains at leat two elements)
        else:
            return random.choice(to_randomize)[0], explored
    
    ######################################## MINIMAX ALPHA-BETA PRUNING ############################################
    
    @staticmethod
    def alpha_beta_search(s:State, l:int, h:Heuristics):
        """ This alpha-beta pruning minimax version is developed to be used in a real-time setting game.
            l represents the number of levels to explore during the search """
        
        def min_value(s:State, alpha, beta, l:int, h:Heuristics, explored):
            
            explored += 1
            
            # base cases
            if s.is_final():
                return h(s), explored
            if l == 0:
                return h(s), explored

            # other case
            v = float('inf')
            for action in s.actions():
                res, explored = max_value(s.apply(action), alpha, beta, l-1, h, explored)
                v = min(v, res)
                
                if v <= alpha:
                    return v, explored
                beta = min(beta, v)

            return v, explored
    
        def max_value(s:State, alpha, beta, l:int, h:Heuristics, explored):
            
            explored += 1

            # base cases
            if s.is_final():
                return h(s), explored
            if l == 0:
                return h(s), explored

            # other case
            v = float('-inf')
            for action in s.actions():
                res, explored = min_value(s.apply(action), alpha, beta, l-1, h, explored)
                v = max(v, res)
                
                if v >= beta:
                    return v, explored
                alpha = max(alpha, v)

            return v, explored
        
        valuated_actions = []
        
        explored = 0
        
        for action in s.actions():
            
            # initial alpha value = - infinity
            # initial beta value = + infinity
            valuation, explored = min_value(s.apply(action), float('-inf'), float('inf'), l, h, explored)
            valuated_actions.append((action, valuation))
        
        sorted_actions = sorted(valuated_actions, key=lambda x: x[1])
        
        # if there are two or more actions with the same valuation, the choice is randomized
        
        # the best action (those with the maximum valuation) is selected
        best_valuation = sorted_actions[len(sorted_actions)-1][1]
        
        to_randomize = []
        
        # building up the list of action from which to choose
        for tup in sorted_actions:
            if tup[1] == best_valuation:
                to_randomize.append(tup)
                
        # if there is only one action the maximum valuation, then it's returned
        if len(to_randomize) == 1:
            return to_randomize[0][0], explored
        
        # otherwise, the choice is randomized on the list previously created (contains at leat two elements)
        else:
            return random.choice(to_randomize)[0], explored

In [53]:
class Agent:
    
    """ class defining the concept of agent. An agent is described by:
        - a search algorithm the agent has to use (pointer to function defined in Algorithms)
        - an heuristic (pointer to function defined in Heuristics) - both for heuristics search and adversarial search
        - a path-cost function (pointer to function defined in PathCosts) - only for heuristic search
        - number of levels to explore (adversarial search, real-time games) """
    
    algorithm = None
    heuristic = None
    level = None
    path_cost = None
    
    
    def __init__(self, alg:Algorithms, heu:Heuristics, path_cost:PathCosts=None, level=None):
        self.algorithm = alg
        self.heuristic = heu
        
        if level is not None:
            self.level = level
            
        if path_cost is not None:
            self.path_cost = path_cost
        
    def __str__(self):
        return f"{self.algorithm},{self.heuristic},{self.path_cost},{self.level}"
    
    def solve(self, s:State):
        """ metodo pensato per giochi a giocatore singolo, in cui gli algoritmi restituiscono direttamente 
            una soluzione, cioè non è necessario selezionare una mossa alla volta """
        
        if self.path_cost is None:
            raise Exception('a path-cost function must be specified!')
        
        return self.algorithm(s, self.path_cost, self.heuristic)
        
    def select_action(self, s:State):
        """ metodo pensato per i giochi competitivi, in cui ogni agente sceglie una mossa alla volta, valutando
            lo stato in cui si trova l'ambiente al momento della scelta """
        
        if self.level is None:
            raise Exception('level must be an integer >= 0!')
        
        # gli algoritmi minimax e alpha-beta pruning restituiscono una tupla formata da:
        # - mossa selezionata
        # - valutazione dello stato risultante dall'applicazione di tale mossa
        return self.algorithm(s, self.level, self.heuristic)

In [54]:
from IPython.display import clear_output
import time
import stats
import pandas as pd

class Infrastructure:
    
    agents = []
    game = None
    
    def __init__(self, agents:list[Agent], game:GameInterface, stat:stats.Statistics):
        self.agents = agents
        self.game = game
        
    def __str__(self):
        return f"{self.agents}, {self.game}"
    
    def set_game(self, game:GameInterface):
        self.game = game
        
    def set_agents(self, agents:list[Agent]):
        self.agents = agents
    
    # main: environment simulation
    def main(self):
        
        if self.game is None:
            raise Exception('game model cannot be None!')
        
        if len(self.agents) == 0 or self.agents is None:
            raise Exception('instantiate with at least one agent!')
            
        # initial state
        s = State(self.game, self.game.get_initial_configuration())
        
        # single-player environment
        if len(self.agents) == 1:
            
            # varibale used to have an approximation of the cumulative time needed to solve each sub-problem
            cumulative_time = 0
            
            # varibale used to count the number of explored state
            cumulative_explored = 0
            
            failure = False
            
            while not failure:
            
                # recall: solve(s) return the sequence of actions needed to solve the problem!
                
                start_time = time.time()
                
                explored = 0
                
                solution, explored = self.agents[0].solve(s)
                
                end_time = time.time()
                
                cumulative_explored += explored
                
                cumulative_time += (end_time - start_time)
                
                # check whether the algorithm has returned a failure:
                if solution is not None:
                    for action in solution:
                        s = s.apply(action)
                        
                        time.sleep(0.3)
                        print(s.representation)
                        
                        if s.is_failure():
                            failure = True
                            print('failure')
                            print(self.game.outcome(s.representation))
                            outcome = self.game.outcome(s.representation)
                            num_moves = outcome['# Moves']
                            score = outcome['Score']
                            fail_cause = outcome['Termination']
                            
                            # add new record for statistics
                            record = [cumulative_time, cumulative_explored, num_moves, score, fail_cause]
                            stat.insert_record(record)
                            
                            break
                        
                        clear_output(wait=True)
                        
                    self.game.update(s.representation)
            
        # multi-player environment (at least 2 agents)
        elif len(self.agents) > 1:
            
            # testing/debugging
            print(s.representation)
            
            # dictionary collecting information of each player
            # each player is associated with:
            # [0] -> total game time (sum of times needed to make a move), initialized to 0
            # [1] -> number of moves, initialized to 0 (a player's turn basically means a move)
            # [2] -> total number of states explored to select the move, initialized to zero
            game_stats = {}
            
            for i in range(0, len(self.agents)):
                game_stats[i] = [0, 0, 0]
            
            while not s.is_final():
                for i in range(0, len(self.agents)):
                    
                    # l'agente i-esimo sceglie la propria azione
                    starting_time = time.time()
                    
                    current_agent_action, explored = self.agents[i].select_action(s)
                    
                    finishing_time = time.time()
                    
                    delta_t = finishing_time - starting_time
                    game_stats[i][0] += delta_t
                    game_stats[i][1] += 1
                    game_stats[i][2] += explored
                    
                    # l'azione scelta dall'agente i-esimo viene applicata allo stato attuale,
                    # un nuovo stato viene generato
                    s = s.apply(current_agent_action)
                        
                    time.sleep(0.3)
                    print(s.representation)
                    
                    # controllo necessario per gestire il caso in cui l'agente ai, i < len(agents) esegue
                    # un'azione che trasforma lo stato corrente in uno finale.
                    if s.is_final():
                        print('game ended!')
                        # in chess case, we have that the player 0 is white and the chess module
                        # returns True if White wins, False otherwise and None if drawn
                        result = self.game.outcome(s.representation)
                        termination = result['Termination']
                        
                        if result['Winner'] is not None:
                            if result['Winner'] is True:
                                winner = 'W'
                            else:
                                winner = 'B'
                        else:
                            winner = 'draw'
                        
                        # returning a dataframe containing collected game statistics and outcome info
                        df = pd.DataFrame(columns=['PLAYER', 'TIME', '# MOVES', '# EXPLORED', 'OUTCOME', 'HAS WIN'])
                        for i in range(0, len(self.agents)):
                            
                            l = []
                            
                            l.append(i)
                            
                            l.extend(game_stats[i])
                            
                            l.append(termination)
                            
                            if winner == 'draw':
                                l.append(0)
                            
                            if winner == 'W':
                                if i == 0:
                                    l.append(1)
                                else:
                                    l.append(0)
                            
                            if winner == 'B':
                                if i == 1:
                                    l.append(1)
                                else:
                                    l.append(0)
                                    
                            df.loc[i] = l
                        
                        return df
                        break
                    
                    clear_output(wait=True)
            

In [55]:
import datetime

In [56]:
def run_experiment(stat, agents, game, sample_size):

    i = Infrastructure(agents, snake_g, stat)
    
    for j in range(0, sample_size):
        print(f'collecting {j+1}-th sample ...')
        time.sleep(0.7)
        
        i.main()
        # the information collected during this experiment, is added to stat
        # that is, added to stat dataframe and will be used to calculate statistics
        print('collected! Starting again ...')

In [57]:
stat = stats.Statistics(['TIME', '# EXPLORED',  '# MOVES', 'SCORE', 'FAILURE CAUSE'])

In [58]:
a1 = Agent(Algorithms.greedy, Heuristics.snake_h3, path_cost=PathCosts.uniform_cost)

In [59]:
snake_g = SnakeGame()

In [60]:
n = int(input('Sample size: '))

Sample size: 1


In [61]:
run_experiment(stat, [a1], snake_g, n)

length:  34
distance:  12
score randomness:  0.7862255261099895
length:  34
distance:  11
score randomness:  1.1286006589295248
length:  34
distance:  14
score randomness:  0.18892323377280507
length:  34
distance:  15
score randomness:  1.552813408796752
length:  34
distance:  13
score randomness:  1.5736232819676963
length:  34
distance:  12
score randomness:  0.6842663986885729
length:  34
distance:  11
score randomness:  0.3087958911064191
length:  34
distance:  10
score randomness:  0.4382527365558985
length:  34
distance:  10
score randomness:  2.5682089625233875
length:  34
distance:  9
score randomness:  0.9151076713846845
length:  34
distance:  10
score randomness:  1.2520950642199744
length:  34
distance:  8
score randomness:  1.2162929533219462
length:  34
distance:  8
score randomness:  1.2910191805329416
length:  34
distance:  9
score randomness:  0.4867168547969287
length:  34
distance:  8
score randomness:  0.4703548120305184
length:  34
distance:  8
score randomness:  1

In [100]:
import datetime
current_datetime = datetime.datetime.now()
current_datetime_str = current_datetime.strftime('%Y_%m_%d %H_%M_%S')
file_name = 'experiment_snake_h3_' + current_datetime_str + '.csv'
df.to_csv(file_name)

# ------------------------------------------------------------------------------------------------------------

In [11]:
chess_gm = ChessGame()

In [21]:
# TEST: minimax alpha_beta h1 vs minimax h1, DEPTH 2

a1 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=1)
a2 = Agent(Algorithms.minimax_search, Heuristics.chess_h1, level=1)

In [22]:
for j in range(0,5):

    i = Infrastructure([a1, a2], chess_gm, None)
    df = i.main()
    current_datetime = datetime.datetime.now()
    current_datetime_str = current_datetime.strftime('%Y_%m_%d %H_%M_%S')
    file_name = 'h1_alpha_beta_vs_minimax_depth_1_'
    file_name += current_datetime_str + '.csv'
    df.to_csv(file_name)

r . . . . . . r
. . . . k Q . p
. q . . P . . .
. . . . . . . .
p . . R . . P .
. . . . . P . .
. . . . . K . .
. . B . . . N .
game ended!


In [24]:
# TEST: minimax alpha_beta h1 vs minimax h1, DEPTH 2

a1 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=2)
a2 = Agent(Algorithms.minimax_search, Heuristics.chess_h1, level=2)

for j in range(0,5):
    
    i = Infrastructure([a1, a2], chess_gm, None)
    df = i.main()
    current_datetime = datetime.datetime.now()
    current_datetime_str = current_datetime.strftime('%Y_%m_%d %H_%M_%S')
    file_name = 'h1_alpha_beta_vs_minimax_depth_2_'
    file_name += current_datetime_str + '.csv'
    df.to_csv(file_name)

r n . q . r . .
. . . . . . b p
. p . N b . k n
. . p p p p . .
p P P P . P p K
P . . . . . P .
. . R . P N B P
. . B . Q . . R
game ended!


In [25]:
# TEST: minimax alpha_beta h1 vs minimax h1, DEPTH 3

a1 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=3)
a2 = Agent(Algorithms.minimax_search, Heuristics.chess_h1, level=3)

for j in range(0,5):
    i = Infrastructure([a1, a2], chess_gm, None)
    df = i.main()
    current_datetime = datetime.datetime.now()
    current_datetime_str = current_datetime.strftime('%Y_%m_%d %H_%M_%S')
    file_name = 'h1_alpha_beta_vs_minimax_depth_3_'
    file_name += current_datetime_str + '.csv'
    df.to_csv(file_name)

KeyboardInterrupt: 

In [61]:
# TEST: minimax alpha_beta h1 vs alpha_beta h1, player 1 depth = 3; player 2 depth = 1

a1 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=3)
a2 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=1)

for j in range(0, 5):

    i = Infrastructure([a1, a2], chess_gm, None)
    df = i.main()
    
    # saving the dataset
    current_datetime = datetime.datetime.now()
    current_datetime_str = current_datetime.strftime('%Y_%m_%d %H_%M_%S')
    file_name = 'h1_alpha_beta_vs_alpha_beta_depth_3_1_'
    file_name += current_datetime_str + '.csv'
    df.to_csv(file_name)

. . . . . . k r
. . . . . . b p
. . p . Q . p .
. . . . . . . q
. . . . P . . .
B . P . . . . N
. . . P . P P P
R . . . . . K .
game ended!


In [62]:
# TEST: minimax alpha_beta h1 vs alpha_beta h1, player 1 depth = 3; player 2 depth = 2

a1 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=3)
a2 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=2)

for j in range(0,5):

    i = Infrastructure([a1, a2], chess_gm, None)
    df = i.main()
    
    current_datetime = datetime.datetime.now()
    current_datetime_str = current_datetime.strftime('%Y_%m_%d %H_%M_%S')
    file_name = 'h1_alpha_beta_vs_alpha_beta_depth_3_2_'
    file_name += current_datetime_str + '.csv'
    df.to_csv(file_name)

. . . k r b n r
p . . Q p . p .
. . . . . . . .
. B . . . p . p
. N . P P P . .
. . . K . . . P
P P P B . . . P
R . . . . . N R
game ended!


In [63]:
# TEST: minimax alpha_beta h1 vs alpha_beta h1, player 1 depth = player 2 depth = 3

a1 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=3)
a2 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=3)

for j in range(0,5):
    i = Infrastructure([a1, a2], chess_gm, None)
    df = i.main()
    
    current_datetime = datetime.datetime.now()
    current_datetime_str = current_datetime.strftime('%Y_%m_%d %H_%M_%S')
    file_name = 'h1_alpha_beta_vs_alpha_beta_depth_3_3_1_'
    file_name += current_datetime_str + '.csv'
    df.to_csv(file_name)

r n b . k b n r
. p p p . p p p
. . . . . . . .
p . . . p . . .
. . . . . . P q
. P . . . P . .
P . P P P . . P
R N B Q K B N R
game ended!


In [64]:
# TEST: minimax alpha_beta h2 vs alpha_beta h1, player 1 depth = player 2 depth = 3

a1 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h2, level=3)
a2 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h1, level=3)

for j in range(0,10):
    print(f'game {j+1}')
    i = Infrastructure([a1, a2], chess_gm, None)
    df = i.main()
    
    current_datetime = datetime.datetime.now()
    current_datetime_str = current_datetime.strftime('%Y_%m_%d %H_%M_%S')
    file_name = 'h2_alpha_beta_vs_h1_alpha_beta_depth_3_3_'
    file_name += current_datetime_str + '.csv'
    df.to_csv(file_name)

KeyboardInterrupt: 

In [12]:
# TEST: minimax alpha_beta h2 vs alpha_beta h2, player 1 depth = player 2 depth = 3

a1 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h2, level=3)
a2 = Agent(Algorithms.alpha_beta_search, Heuristics.chess_h2, level=3)

for j in range(0,6):
    print(f'game {j+1}')
    i = Infrastructure([a1, a2], chess_gm, None)
    df = i.main()
    
    current_datetime = datetime.datetime.now()
    current_datetime_str = current_datetime.strftime('%Y_%m_%d %H_%M_%S')
    file_name = 'h2_alpha_beta_vs_h2_alpha_beta_depth_3_3_'
    file_name += current_datetime_str + '.csv'
    df.to_csv(file_name)

. . . . . r k .
. p . . . . . .
p . . n . . . .
. . . r . . . p
. P . . b K p .
N . . . P . P .
P . . . . . . P
. . . . R B R .
game ended!
