Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [19]:
from random import random
from math import ceil
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue

import numpy as np
import copy
from tqdm.auto import tqdm

In [20]:
State = namedtuple('State', ['taken', 'not_taken'])


def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )


def goal_check(state):
    return np.all(covered(state))

In [21]:
PROBLEM_SIZE = 100
NUM_SETS = 200
SETS = tuple(np.array([random() < 0.3 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))
assert goal_check(State(set(range(NUM_SETS)), set())), "Probelm not solvable"

## Depth First

In [207]:
frontier = deque()
state = State(set(), set(range(NUM_SETS)))
frontier.append(state)

counter = 0
current_state = frontier.pop()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.pop()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

22it [00:00, 7821.21it/s]

Solved in 22 steps (22 tiles)





## Breadth First

In [10]:
frontier = deque()
state = State(set(), set(range(NUM_SETS)))
frontier.append(state)

counter = 0
current_state = frontier.popleft()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.popleft()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

139939it [01:01, 2284.63it/s]


KeyboardInterrupt: 

## Greedy Best First

In [5]:
def f(state):
    missing_size = PROBLEM_SIZE - sum(covered(state))
    return missing_size

In [6]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

5it [00:00, 499.17it/s]

Solved in 5 steps (5 tiles)





## A*

In [22]:
# HEURISTICS MADE BY THE TEACHER IN CLASS


def h(state):
    largest_set_size = max(sum(s) for s in SETS) # I take the set with the biggest number of true
    missing_size = PROBLEM_SIZE - sum(covered(state)) # number of missing tiles
    optimistic_estimate = ceil(missing_size / largest_set_size) # minimum number of missing tiles
    return optimistic_estimate


def h2(state):
    already_covered = covered(state) #already covered elements
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS) # I take the tile with the max number of missing elements
    missing_size = PROBLEM_SIZE - sum(already_covered) # elements i still have to cover
    optimistic_estimate = ceil(missing_size / largest_set_size) # minimum number of missing tiles
    return optimistic_estimate


def h3(state):
    already_covered = covered(state) #already covered elements
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered) # number of elements not covered
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True) # i sort the remaining tiles from the biggest remaining n of elements occupied to the last
    taken = 0
    while sum(candidates[:taken]) < missing_size: # scorro il vettore dei tile per stimare al meglio quante ne mancano
        taken += 1
    return taken


#MY HEURISTICS

#better algorithm for engineering purposes, significantly faster solution than h10, simetimes gets 1 more tile
def h8(state):
    already_covered = covered(state)  
    if np.all(already_covered):
        return 0

    missing_size = PROBLEM_SIZE - np.sum(already_covered)  # number of elements not covered

    candidates = sorted(state.not_taken, key=lambda x: np.sum(SETS[x] & ~already_covered), reverse=True)        #sort the not taken tiles putting first the ones who will cover more uncovered elements
#   candidates = sorted(state.not_taken , key = lambda x: sum(np.logical_and(SETS[x], np.logical_not(already_covered))), reverse=True)  ///  similar code to the one developed in class, less efficient

    taken = 0
    while missing_size > 0:

        selected_candidate = candidates[taken]

        already_covered |= SETS[selected_candidate]                                                       #update the already covered elements so i avoid overlapping in the estimation (biggest difference with teacher's code)
        missing_size = PROBLEM_SIZE - np.sum(already_covered)
        taken += 1

    return taken


#better algorithm for a mathematical purposes, i get higher quality results but i don't think the tradeoff with performances is justified
#better for small dimension problem, scales really bad (sort is n*log(n) inside loop becomes n^2*log(n))
def h10(state):
    already_covered = covered(state)  
    if np.all(already_covered):
        return 0

    missing_size = PROBLEM_SIZE - np.sum(already_covered)  

    candidates = sorted(state.not_taken, key=lambda x: np.sum(SETS[x] & ~already_covered), reverse=True)

    taken = 0
    while missing_size > 0:


        selected_candidate = candidates[0]                                                              # take the first element every time because i sort at every cycle
        already_covered |= SETS[selected_candidate]
        missing_size = PROBLEM_SIZE - np.sum(already_covered)
        taken += 1

        candidates = sorted(state.not_taken, key=lambda x: np.sum(SETS[x] & ~already_covered), reverse=True) #the sort is really expensive, generally gives me a better result, but requires double the time of the previous algorithm
        #for some reason this code gives me more iteration then the previous one, check the code to find mistake (?)


    return taken
 
 #The algorithms were developed with the sweet company of Paul-Raphael Spazzola and Alexandre Senouci

def f(state):
    return len(state.taken) + h8(state)



In [23]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
            #print(f(new_state), new_state)

        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

10it [00:02,  4.82it/s]

Solved in 10 steps (5 tiles)



