In [None]:
from random import random
from functools import reduce
from collections import namedtuple
from queue import PriorityQueue, SimpleQueue, LifoQueue

import numpy as np

In [None]:
PROBLEM_SIZE = 5
NUM_SETS = 40
# SETS is a tuple that contains NUM_SETS arrays. Every array contains PROBLEM_SIZE boolean elements
# random() returns a random value between 0 e 1. The expression random() <.3 is true with a probability of 30%
SETS = tuple(
    np.array([random() < 0.30 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS)
)
# State is a nominated tuples class with two fields : 'taken' and 'not_taken', that are two sets of indexes 
State = namedtuple("State", ["taken", "not_taken"])

def goal_check(state):
    return np.all(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        )
    ) 

#it checks if goal is reacheable
assert goal_check(State(set(range(NUM_SETS)), set())), "Probelm not solvable"


In [None]:
#Heuristic tried during lecture. It is not admissible for A* algorithm beacuse it overestimates the distance from the goal
def distance(state):
    return PROBLEM_SIZE  - sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))  

The goal is to estimate the smallest number of pieces, among those not taken, needed to complete a line.

Step 1: Calculate logical union of all pieces in taken. It will be called u_taken.

Step 2: For each i in not_taken:
            Union u_taken with not_taken[i]
            Count the number of true values
            Add it to an array of values

Step 3: Find the maximum value in the array => m

Step 4: Is PROBLEM_SIZE - m equal to 0? If yes, return 1. Otherwise, return 2.

Note: If PROBLEM_SIZE - m equals 0, it means one step is missing from the optimal solution. Otherwise, we don't know how many steps are needed, but at least 2. By returning 2, I make an optimistic estimation of the remaining steps. This means that this Heuristic is admissible. Given the fact that it is monotonic also, it is possible to use this function for A*.

In [None]:
def admissible_distance(state):
    u_taken= reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ) 
     
    m=np.max(np.array([np.sum(reduce(np.logical_or,
            u_taken,
            SETS[i],)) for i in state.not_taken]) )
    
    if( PROBLEM_SIZE-m == 0):
        return 1
    else:
        return 2
    
        
    

Regarding the cost function, I simply consider the number of taken pieces.

In [None]:
def cost(state):
    return len(state.taken)

In [None]:
def f(state):
    return  admissible_distance(state) + cost(state)

In [None]:
####### LECTURE VERSION (Greedy best first)
frontier = (
    PriorityQueue()
)  
state=State(set(), set(range(NUM_SETS)))
frontier.put(
    (distance(state), state)
)  

counter = 0  # steps
_, current_state = (
    frontier.get()
)  
while not goal_check(current_state): 
    counter += 1
    for action in current_state[1]:  
        new_state = State(
            current_state.taken ^ {action}, current_state.not_taken ^ {action}
        )  # xor property: if two elements are equals => false else true. So it removes action from not_taken and it puts in taken
        frontier.put((distance(new_state) ,new_state)) 
    _, current_state = (
        frontier.get()
    )  

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles(pezzi))")
print(current_state.taken)
for i in current_state.taken:
    print(SETS[i])

In [None]:
####### Breadth first Version
frontier = (
    PriorityQueue()
)  
state=State(set(), set(range(NUM_SETS)))
frontier.put(
    (cost(state), state)
)  

counter = 0  #steps
_, current_state = (
    frontier.get()
)  
while not goal_check(current_state):  
    counter += 1
    for action in current_state[1]:  
        new_state = State(
            current_state.taken ^ {action}, current_state.not_taken ^ {action}
        )  # xor property: if two elements are equals => false else true. So it removes action from not_taken and it puts in taken
        frontier.put((cost(new_state) ,new_state)) 
    _, current_state = (
        frontier.get()
    ) 

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles(pezzi))")
print(current_state.taken)
for i in current_state.taken:
    print(SETS[i])

In [None]:
#A* VERSION
frontier = (
    PriorityQueue()
)  
state=State(set(), set(range(NUM_SETS)))
frontier.put(
    (f(state), state)
)  

counter = 0  #steps
_, current_state = (
    frontier.get()
)  
while not goal_check(current_state):  
    counter += 1
    for action in current_state[1]:  
        new_state = State(
            current_state.taken ^ {action}, current_state.not_taken ^ {action}
        )  # xor property: if two elements are equals => false else true. So it removes action from not_taken and it puts in taken
        frontier.put((f(new_state)  ,new_state))  
    _, current_state = (
        frontier.get()
    )  

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles(pezzi))")
print(current_state.taken)
for i in current_state.taken:
    print(SETS[i])

In [None]:
current_state

In [None]:
goal_check(current_state)