In [1]:
import random
from queue import PriorityQueue
import logging
from time import perf_counter_ns
from math import ceil

In [2]:

SEED = 42

logging.basicConfig(format="%(message)s", level=logging.INFO)

In [3]:
class PrioritizedSet:
    def __init__(self, priority: int, s: set):
        self.priority = 100 - priority
        self.item = s

    def __lt__(self, other):
        return self.priority < other.priority
    
    def __eq__(self, other):
        return self.priority == other.priority
    
    def __gt__(self, other):
        return self.priority > other.priority

In [4]:
def problem(N, seed=None):
    random.seed(seed)
    return [list(set(random.randint(0,N-1) for n in range(random.randint(N // 5, N // 2)))) for n in range(random.randint(N,N*5))]

In [5]:
def calculate_weight(r,g,s = set(),threshold=1):
    w = ceil(100*sum(x in g-r for x in s) / len(s))
    return 100 if w>=threshold*100 else w

In [6]:
def search(sets,goal,N,threshold=1):
    discovered_state = 0
    options = PriorityQueue()
    unused = PriorityQueue()
    for ps in sets:
        options.put(PrioritizedSet(int(100*len(ps)/N),ps))
    result = [options.get().item]
    result_set = set().union(result[0])
    while result is not None and not result_set == goal:
        while not options.empty():
            discovered_state += 1
            s = options.get().item
            coverage = calculate_weight(result_set,goal,s,threshold)
            if coverage == 100:
                result.append(s)
                result_set = result_set.union(s)
                while not unused.empty():
                    options.put(unused.get())
                break
            elif coverage != 0:
                unused.put(PrioritizedSet(coverage,s))
        else:
            if unused.empty():
                result = None
                break
            else:
                local_best = unused.get().item
                result.append(local_best)
                result_set = result_set.union(local_best)
                while not unused.empty():
                    options.put(unused.get())
    logging.info(f"explored state: {discovered_state}")
    return result

In [7]:
for n in [5,10,20,50,100,500,1000]:
    sets = problem(n,SEED)
    goal = set(_ for _ in range(n))
    logging.info(f"N = {n}")
    start_time_ns = perf_counter_ns()
    result = search(sets, goal,n,threshold=0.5)
    end_time_ns = perf_counter_ns()
    logging.info(f"Time: {end_time_ns-start_time_ns} ns")
    if result == None:
        logging.info("No solution found")
    else:
        logging.info(f"the weight of the solution is: {sum(len(s) for s in result)}")