# Set Cover Problem

The set cover problem is a combinatorial optimization problem. The problem is to find the smallest set of sets that covers all elements of a given universe. For example, suppose we have a universe of 10 elements and 5 sets, each of which contains a subset of the universe. The goal is to find the smallest set of sets that covers all elements of the universe. 

Since the problem is NP-hard, we have to use a heuristic algorithm to solve and there exists no theoretically polynomial time algorithm.

In this notebook, I will attempt to solve the set cover problem using the following algorithms:
1. Naive Greedy
2. Greedy with a better cost function
3. Building a Fully Connected Graph and over-complicating things
2. Breadth First Traversal
3. A* Traversal

> Sidharrth Nagappan, 2022

### Necessary Imports

In [1]:
import random
from collections import deque
import itertools
import time

### Building the problem set

In [2]:
# setting a constant seed for reproducibility
SEED = 42

def problem(N, seed=SEED):
    random.seed(seed)
    return [
        list(set(random.randint(0, N - 1) for n in range(random.randint(N // 5, N // 2))))
        for n in range(random.randint(N, N * 5))
    ]

for n in [5]:
    print(f"N = {problem(n)}")

N = [[0], [1], [0], [4], [0], [1], [4], [4], [4], [1, 3], [0, 1], [2], [1], [0], [0, 2], [2, 4], [3], [3], [4], [2, 4], [0], [1], [0, 1], [3], [2, 3]]


### Naive Greedy Algorithm

The greedy algorithm essentially traverses through a sorted list of subsets and keeps adding the subset to the solution set if it covers any new elements. The algorithm is very naive as it does not take into account the number of new elements.

In [3]:
def greedy(N):
    goal = set(range(N))
    covered = set()
    solution = list()
    all_lists = sorted(problem(N, seed=42), key=lambda l: len(l))
    while goal != covered:
        x = all_lists.pop(0)
        if not set(x) < covered:
            solution.append(x)
            covered |= set(x)

    print(
        f"Naive greedy solution for N={N}: w={sum(len(_) for _ in solution)} (bloat={(sum(len(_) for _ in solution)-N)/N*100:.0f}%)"
    )

### Smarter Greedy Algorithm (Sorting at each step)

In real-life scenarios, the cost depends on the relative price of visiting a node/choosing an option. Since we consider all options to be arbitrarily priced, we use a constant cost of 1. This version of the greedy algorithm takes the subset with the lowest $f$ where:

- $S_e$ is the expected solution (containing all the unique elements)
- $n_i$ is the current subset
- The cost is set to 1 here, since there is no "business" cost associated with choosing a subset

$$f_i = 1 / |n_i - S_e|$$

In [4]:
def set_covering_problem_greedy(expected_solution, subsets, costs):
    cost = 0
    visited_nodes = 0
    already_discovered = set()
    final_solution = []
    while covered != expected_solution:
        subset = min(subsets, key=lambda s: costs[subsets.index(s)] / (len(s-covered) + 1))
        final_solution.append(subset)
        cost += costs[subsets.index(subset)]
        visited = visited+1
        covered |= subset
    print("NUMBER OF VISITED NODES: ", visited)
    print("w: ", sum(len(_) for _ in final_solution))
    return final_solution, cost

### A* Traversal

The A* algorithm requires a monotonic heuristic function that symbolises the remaining distance between the current state and the goal state. In the case of the set cover problem, the heuristic function is the number of elements that are not covered by the current solution set. The algorithm is implemented using a priority queue. There are two ways of implementing A*, we can either:

1. build a fully connected graph and use an open and closed list to traverse
2. use a priority queue

For learning purposes, I will implement both, even though the second method is more efficient.

In [5]:
from helpers import State, PriorityQueue

# def are_we_done(N, discovered_elements):
#     '''
#     Checks if all elements are discovered
#     args:
#         N: number of elements to expect (final list is a range of this)
#         discovered_elements: list of discovered elements so far
#     '''
#     # flattened_list = list(itertools.chain.from_iterable(self.final_list))
#     for i in range(N):
#         if i not in discovered_elements:
#             return False
#     print("We are done")
#     return True

In [25]:
from typing import Callable

import numpy as np


def astar_search(
    initial_state: State,
    subsets: list,
    are_we_done: Callable,
    parents: dict,
    cost_of_each_state: dict,
    priority_function: Callable,
    unit_cost: Callable,
    N: int
):
    frontier = PriorityQueue()
    parents.clear()
    cost_of_each_state.clear()
    
    state = initial_state
    parents[state] = None
    cost_of_each_state[state] = 0
    # to find length at the end without needed to flatten the state
    discovered_elements = []
    
    while state is not None and not are_we_done(N, state):
        for subset in subsets:
            # if this list has already been collected, skip
            if subset in state.copy_data():
                # print("Already in")
                continue
            new_state = add_to_state(state, subset)
            state_cost = unit_cost(subset)
            # if new_state not in cost_of_each_state or cost_of_each_state[new_state] > cost_of_each_state[state] + state_cost:
            if new_state not in cost_of_each_state and new_state not in frontier:
                parents[new_state] = state
                cost_of_each_state[new_state] = cost_of_each_state[state] + state_cost
                frontier.push(new_state, p=priority_function(new_state))
            elif new_state in frontier and cost_of_each_state[new_state] > cost_of_each_state[state] + state_cost:
                parents[new_state] = state
                cost_of_each_state[new_state] = cost_of_each_state[state] + state_cost
        if frontier:
            state = frontier.pop()
        else:
            state = None

    path = list()
    s = state

    while s:
        path.append(s.copy_data())
        s = parents[s]

    print(f"Path: {len(list(itertools.chain.from_iterable(path[0])))}")
    print(f"Found a solution in {len(path):,} steps; visited {len(cost_of_each_state):,} states")
    return list(reversed(path))

In [26]:
def are_we_done(N, state):
    flattened_list = list(itertools.chain.from_iterable(state.copy_data().tolist()))
    for i in range(N):
        if i not in flattened_list:
            return False
    return True

In [30]:
import numpy as np

N = 20
GOAL = State(np.array(range(N)))
subsets = problem(N, seed=42)
initial_state = State(np.array([subsets[0]]))

parents = dict()
cost_of_each_state = dict()

def h(state):
    num_undiscovered_elements = len(set(range(N)) - set(list(itertools.chain.from_iterable(state.copy_data().tolist()))))
    return num_undiscovered_elements

astar_search(
    initial_state = initial_state,
    subsets = subsets,
    are_we_done = are_we_done,
    parents = parents,
    cost_of_each_state = cost_of_each_state,
    priority_function = lambda state: cost_of_each_state[state] + h(state),
    unit_cost = lambda subset: len(subset),
    N = N
)

  if subset in state.copy_data():


Path: 23
Found a solution in 5 steps; visited 34,742 states


[array([[8, 4, 7]]),
 array([list([8, 4, 7]), list([1, 3, 13, 14])], dtype=object),
 array([list([8, 4, 7]), list([1, 3, 13, 14]),
        list([2, 6, 8, 10, 12, 15, 18])], dtype=object),
 array([list([8, 4, 7]), list([1, 3, 13, 14]),
        list([2, 6, 8, 10, 12, 15, 18]), list([16, 9, 19, 6])],
       dtype=object),
 array([list([8, 4, 7]), list([1, 3, 13, 14]),
        list([2, 6, 8, 10, 12, 15, 18]), list([16, 9, 19, 6]),
        list([0, 5, 11, 16, 17])], dtype=object)]

In [33]:
class AStarSearch:
    def __init__(self, N, seed=42):
        # N is the number of elements to expect
        self.N = N
        self.seed = seed
    
    def add_to_state(self, st, subset):
        state_list = st.copy_data().tolist()
        state_list.append(subset)
        return State(np.asarray(state_list, dtype=object))

    def are_we_done(self, state):
        flattened_list = self.flatten_list(state.copy_data().tolist())
        for i in range(N):
            if i not in flattened_list:
                return False
        # print("We are done")
        return True
    
    def flatten_list(self, l):
        return list(itertools.chain.from_iterable(l))
    
    def h(self, state):
        '''
        Heuristic Function h(n) = number of undiscovered elements
        '''
        num_undiscovered_elements = len(set(range(N)) - set(self.flatten_list(state.copy_data().tolist())))
        return num_undiscovered_elements

    def astar_search(
        self,
        initial_state: State,
        subsets: list,
        are_we_done: Callable,
        parents: dict,
        cost_of_each_state: dict,
        priority_function: Callable,
        unit_cost: Callable,
        N: int
    ):
        frontier = PriorityQueue()
        parents.clear()
        cost_of_each_state.clear()
        
        state = initial_state
        parents[state] = None
        cost_of_each_state[state] = 0
        # to find length at the end without needed to flatten the state
        discovered_elements = []
        
        while state is not None and not are_we_done(N, state):
            for subset in subsets:
                # if this list has already been collected, skip
                if subset in state.copy_data():
                    # print("Already in")
                    continue
                new_state = self.add_to_state(state, subset)
                state_cost = unit_cost(subset)
                # if new_state not in cost_of_each_state or cost_of_each_state[new_state] > cost_of_each_state[state] + state_cost:
                if new_state not in cost_of_each_state and new_state not in frontier:
                    parents[new_state] = state
                    cost_of_each_state[new_state] = cost_of_each_state[state] + state_cost
                    frontier.push(new_state, p=priority_function(new_state))
                elif new_state in frontier and cost_of_each_state[new_state] > cost_of_each_state[state] + state_cost:
                    parents[new_state] = state
                    cost_of_each_state[new_state] = cost_of_each_state[state] + state_cost
            if frontier:
                state = frontier.pop()
            else:
                state = None

        path = list()
        s = state

        while s:
            path.append(s.copy_data())
            s = parents[s]

        print(f"Length of final list: {len(self.flatten_list(path[0]))}")
        print(f"Found a solution in {len(path):,} steps; visited {len(cost_of_each_state):,} states")
        return list(reversed(path))
    
    def search(self):
        GOAL = State(np.array(range(self.N)))
        subsets = problem(N, seed=self.seed)
        initial_state = State(np.array([subsets[0]]))

        parents = dict()
        cost_of_each_state = dict()

        self.astar_search(
            initial_state = initial_state,
            subsets = subsets,
            are_we_done = are_we_done,
            parents = parents,
            cost_of_each_state = cost_of_each_state,
            priority_function = lambda state: cost_of_each_state[state] + h(state),
            unit_cost = lambda subset: len(subset),
            N = N
        )

In [34]:
engine = AStarSearch(N=20, seed=42)
engine.search()

  if subset in state.copy_data():


Length of final list: 23
Found a solution in 5 steps; visited 57,658 states
