## First assignment (A* implementation)

In [28]:
from random import random
from functools import reduce
from collections import namedtuple
from queue import PriorityQueue, SimpleQueue, LifoQueue
import numpy as np

The state is described by a set of already taken sets of tiles and a set of not taken sets of tiles.

In [43]:
PROBLEM_SIZE = 20 #size of the numpy arrays "SETS" that represent all the sets of tiles (a tile is a bolean)
NUM_SETS = 30 #number of sets of tiles
TILE_PROBABILITY = 0.3 #probability of one single tile inside each set of being covered (being True)

SETS = tuple(
    np.array([random() < TILE_PROBABILITY for _ in range(PROBLEM_SIZE)])
    for _ in range(NUM_SETS)
)
State = namedtuple('State', ['taken', 'not_taken'])

In [30]:
def goal_check(state):
    return np.all(reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    ))


def pessimistic_distance_from_the_solution(state): #not used for A* because we need an optimistic one
    return PROBLEM_SIZE - sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))


### A* implementation:

I need to implement 2 functions to compute the priority of a state:
- the cost function: it describes the cost paid to arrive at a certain point
- the heuristic function: it describes how far the solution is from the current state

In this case I decided to use as heuristic function a cumulative distribution, indeed I calculate the probability of existence of a set of tiles in which there are at least a number of True equal to the number of uncovered element, the optimistic point of view is in the fact of considering that this set of tiles has all the True in the exact positions that I need. In order to consider a decreasing function I used 1 - cdf as a measure of distance from the solution.

For the cost function I used the number of taken sets normalized between 0 and 1, obtained dividing by the total number of sets. This normalization is necessary because otherwise the quantity of the cost and of the heuristic were not comparable (the priority key would practically depend only on the cost function -> Djikstra algorithm). Furthermore I decided to multiply this quantity by alpha in order to enlarge the range to (0, alpha), to guarantee a more complete exploration of the solution (otherwise there were some rare cases in which this algorithm didn't find the best solution). I set this alpha = 2, a bigger number would give more importance to the cost function and would lead to a more complete exploration of the solutions.


In [46]:
from scipy.stats import binom

def not_covered(state):
    return PROBLEM_SIZE - sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))

def heuristic_function(state):            #for me p(x>=3) 1-p(x<3)
    prob_greater_equal = (1 - binom.cdf(not_covered(state), PROBLEM_SIZE, TILE_PROBABILITY)) #but this is an increasing f 
    #to have a decreasing heuristic and a quantity comparable to the cost function I choose to compute this measure
    distance_from_solution_h = 1 - prob_greater_equal 
    # ^^^^^ basically the cdf but I repeated the code to be clear (it could be just avoided the double "1 - ...")
    return distance_from_solution_h # quantity included in (0, 1)

def cost_function(state):
    alpha = 2
    return (1/NUM_SETS) * len(state.taken) * alpha # quantity included in (0, alpha), in order to make the algorithm more complete

In [32]:
def a_star(state):
    frontier = PriorityQueue()
    frontier.put((cost_function(state) + heuristic_function(state), state))

    counter = 0
    _, current_state = frontier.get()
    while not goal_check(current_state):
        counter += 1
        for action in current_state.not_taken:
            new_state = State(current_state.taken ^ {action}, current_state.not_taken ^ {action})
            frontier.put((cost_function(new_state) + heuristic_function(new_state), new_state))
        _, current_state = frontier.get()

    print(f"Solved in {counter:,} steps ({len(current_state.taken)} sets of tiles)")
    print(f"Solution: {current_state}")
    

In [47]:
assert goal_check(
    State(set(range(NUM_SETS)), set())
), "Problem not solvable"

state = State(set(), set(range(NUM_SETS)))
a_star(state)

Solved in 12 steps (4 sets of tiles)
Solution: State(taken={8, 9, 3, 5}, not_taken={0, 1, 2, 4, 6, 7, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29})


Code written by the professor, usefull to make some comparisons in results and performances:

In [45]:
frontier = SimpleQueue() # Fifo -> for Breadth first (to solve looking for the minimum number of tiles)
#frontier = LifoQueue() #for Depth first (to solve looking for the minimum number of steps)
#frontier = PriorityQueue() #sorted by the key (for example using the distance from the solution you can implement a Greedy best-first)

state = State(set(), set(range(NUM_SETS)))
frontier.put((pessimistic_distance_from_the_solution(state), state))

counter = 0
_, current_state = frontier.get()
while not goal_check(current_state):
    counter += 1
    for action in current_state[1]:
        new_state = State(
            current_state.taken ^ {action},
            current_state.not_taken ^ {action},
        )
        frontier.put((pessimistic_distance_from_the_solution(new_state), new_state))
    _, current_state = frontier.get()

print(f"Solved in {counter:,} steps ({len(current_state.taken)} sets of tiles)")
print(f"Solution: {current_state}")

Solved in 49,039 steps (4 sets of tiles)
Solution: State(taken={1, 3, 21, 14}, not_taken={0, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 15, 16, 17, 18, 19, 20, 22, 23, 24, 25, 26, 27, 28, 29})
