Copyright **`(c)`** 2023 Edoardo Vay  `<vay.edoardo@gmail.com>`

<https://github.com/Edoxy>

In [1]:
import numpy as np
from functools import reduce
from random import random, seed
from queue import PriorityQueue
from collections import namedtuple
from scipy.stats import binom
import scipy.special as sp

from tqdm.auto import tqdm

  from .autonotebook import tqdm as notebook_tqdm


# Problem Generation

In [2]:
seed(0)
PROBABILITY = 0.3
PROBLEM_SIZE = 100
NUM_SETS = 200

# 20% prob of being true using array compriansion and then converting to nparray
SETS = tuple(
    np.array([random() < PROBABILITY for _ in range(PROBLEM_SIZE)])
    for _ in range(NUM_SETS)
)

## removes double sets
SETS = set(tuple(tuple(set_) for set_ in SETS))
SETS = list(np.array(set_) for set_ in SETS)

## sort elements from the one with most zeros
SETS.sort(key=lambda x: -sum(x))
SETS = tuple(SETS)
NUM_SETS = len(SETS)

# create named tuple
State = namedtuple("State", ["taken", "not_taken"])

In [3]:
def goal_check(state):
    if state[0] == list() or state[0] == set():
        return False
    return np.all(reduce(np.logical_or, [SETS[i] for i in state[0]]))


assert goal_check((list(range(NUM_SETS)), list())), "Problem not solvable"

# Stochastic approach

### Probability estimation 

In [4]:
### PROBABILISTIC HEURISTIC ###
def prob_estimation(n, *, flag=1):
    """
    n: number of zeros
    flag = 1 (Default): Compute the probability that we generate 1 SET that exactly covers the zeros that we have in this state
    flag = 2: Compute the probability that we generate 2 SETS that together cover exactily the zeros we have in the state
    """
    if flag == 1:
        return PROBABILITY**n
    elif flag == 2:
        p = 0
        # is sums over all the possible cofigurations of two tiles that
        # it can have with that nuber of remaning spaces
        for i in range(1, n):
            # sum over all the possible i, the probability of having a set that covers i spaces (Binomial distribution) and covering the
            # remaning spaces with one set;
            p += binom.pmf(i, n, PROBABILITY) * PROBABILITY ** (n - i)
        return p

### Stochastic heuristic definition v1

In [5]:
def h_p(state):
    """
    Stochastic heuristic function based on the binomial distribution
    alpha: probability threshold that determins the probability of not being optimistic
    """

    alpha = 0.05
    # first part is deterministic
    reduction = reduce(np.logical_or, [SETS[i] for i in state[0]])
    n_zero = PROBLEM_SIZE - np.sum(reduction)
    if n_zero < 2:
        return n_zero
    else:
        n_not_taken = len(state.not_taken)
        # calculate the probability of having in the not_taken a set that covers exactily the remaning spaces
        # more precisely it calculates (1 - Prob(not having any set that covers the remaning spaces))
        if (1 - (1 - prob_estimation(n_zero, flag=1)) ** n_not_taken) > alpha:
            # being the steps dicrete, we add a priority based on the number of remaning spaces without ever overestimating
            return 1 + n_zero / PROBLEM_SIZE

        elif (
            1 - (1 - prob_estimation(n_zero, flag=2)) ** sp.binom(n_not_taken, 2)
        ) > alpha: # if it's very improbable that we can finish with one set, we esimate the same for two
            return 2 + n_zero / PROBLEM_SIZE
        else:
            return 3 + n_zero / PROBLEM_SIZE


def a_star_distance_p(state):
    return len(state.taken) + h_p(state)

### Stochastic heuristic definition v2

In [6]:
# optimized version that saves the probability value in a dictionary making it run much faster
# in fact the probability calculationa are only based on the number of not covered spaces ('zeros') indipendently from the set shape
global_LookupTable_p = dict()


def h_p2(state):
    """
    Stochastic heuristic function based on the binomial distribution
    alpha: probability threshold
    """
    alpha = 0.05
    reduction = reduce(np.logical_or, [SETS[i] for i in state[0]])
    n_zero = PROBLEM_SIZE - np.sum(reduction)
    if n_zero < 2:
        return n_zero
    else:
        n_not_taken = len(state.not_taken)

        if (n_zero, 1) not in global_LookupTable_p:
            global_LookupTable_p[(n_zero, 1)] = prob_estimation(n_zero, flag=1)
        if (n_zero, 2) not in global_LookupTable_p:
            global_LookupTable_p[(n_zero, 2)] = prob_estimation(n_zero, flag=2)

        if (1 - (1 - global_LookupTable_p[(n_zero, 1)]) ** n_not_taken) > alpha:
            return n_zero / PROBLEM_SIZE

        elif (
            1 - (1 - global_LookupTable_p[(n_zero, 2)]) ** sp.binom(n_not_taken, 2)
        ) > alpha:
            return 1 + n_zero / PROBLEM_SIZE
        else:
            return 2 + n_zero / PROBLEM_SIZE


def a_star_distance_p2(state):
    return len(state.taken) + h_p2(state)

### Stochastic heuristic definition v3

In [7]:
# final oprimization of the heuristic also saves the value of the probabilty basend on the number of not_taken that remain in the 
# state, further optimizing computational time. To reduce the number of value to save, we round the probability calculations so that 
# if the value are sufficiently close, we can use the precomputed one
prob_table = dict()
binom_table = dict()


def h_p3(state):
    """
    Stochastic heuristic function based on the binomial distribution
    state: a named tuple State() that rappresents a state in the problem

    ALPHA: probability of overestimating
    ROUNDING: number of digits used to round the probability
    """
    ALPHA = 0.05
    ROUNDING = 8

    reduction = reduce(np.logical_or, [SETS[i] for i in state[0]])
    n_zero = PROBLEM_SIZE - np.sum(reduction)
    if n_zero < 2:
        return n_zero
    else:
        n_not_taken = len(state.not_taken)

        ## table check
        if (n_zero, 1) not in prob_table:
            prob_table[(n_zero, 1)] = round(prob_estimation(n_zero, flag=1), ROUNDING)
        if (n_zero, 2) not in prob_table:
            prob_table[(n_zero, 2)] = round(prob_estimation(n_zero, flag=2), ROUNDING)

        if (prob_table[(n_zero, 1)], n_not_taken) not in binom_table:
            binom_table[(prob_table[(n_zero, 1)], n_not_taken)] = 1 - (
                (1 - prob_table[(n_zero, 1)]) ** n_not_taken
            )
        if (prob_table[(n_zero, 2)], n_not_taken) not in binom_table:
            binom_table[(prob_table[(n_zero, 2)], n_not_taken)] = 1 - (
                (1 - prob_table[(n_zero, 2)]) ** sp.binom(n_not_taken, 2)
            )
        ## end table check

        if binom_table[(prob_table[(n_zero, 1)], n_not_taken)] > ALPHA:
            return 1 - binom_table[(prob_table[(n_zero, 1)], n_not_taken)]

        elif binom_table[(prob_table[(n_zero, 2)], n_not_taken)] > ALPHA:
            return 1 + 1 - binom_table[(prob_table[(n_zero, 2)], n_not_taken)]

        else:
            return 2 + 1 - binom_table[(prob_table[(n_zero, 2)], n_not_taken)]


def a_star_distance_p3(state):
    return len(state.taken) + h_p3(state)

# A* algorithm

In [8]:
def action_func(state, action_index):
    '''
    This function creates the new element to add in the queue, based on the fact that we are using a list of ordered index of sets
    in this way we avoid putting in queue the same state more than one tiime
    '''
    new_taken = state.taken.copy()
    new_taken.append(state.not_taken[action_index])
    return State(new_taken, state.not_taken.copy()[action_index + 1 :])


# generic A star implementation
def A_star(init_state, action_func, goal_check, valid_dist):
    frontier = PriorityQueue()
    counter = 0
    current_state = init_state

    with tqdm(total=None) as pbar:
        while not goal_check(current_state):
            counter += 1

            for action_index in range(len(current_state.not_taken)):
                new_state = action_func(current_state, action_index)

                priority = valid_dist(new_state)
                frontier.put((priority, new_state))
            frontier.task_done()
            _, current_state = frontier.get()
            pbar.update(1)
        pbar.close()
    return counter, current_state

# Testing

## Stochastic A*

### Version 1

In [9]:
# # probilistic h
# counter, sol_state = A_star(
#     State(list(), list(range(NUM_SETS))),
#     action_list_func,
#     action_func,
#     goal_check,
#     a_star_distance_p,
# )
# print(f"Solved in {counter:,} steps, with {len(sol_state.taken)} sets")
# print(sol_state.taken)
# sum(sum([SETS[i] for i in sol_state.taken]))

### Version 2
Using 1 dictionary to store probability calculation

In [10]:
# # probilistic h
# counter, sol_state = A_star(
#     State(list(), list(range(NUM_SETS))),
#     action_func,
#     goal_check,
#     a_star_distance_p2,
# )
# print(f"Solved in {counter:,} steps, with {len(sol_state.taken)} sets")
# print(sol_state.taken)


### Version 3
Using 2 dictionaries to store calculation and added round function to store probabilities

In [13]:
# probilistic h
counter, sol_state = A_star(
    State(list(), list(range(NUM_SETS))),
    action_func,
    goal_check,
    a_star_distance_p3,
)
print(f"Solved in {counter:,} steps, with {len(sol_state.taken)} sets")
print(sol_state.taken)

380it [00:00, 952.98it/s] 


Solved in 380 steps, with 5 sets
[0, 1, 8, 10, 19]
