In [None]:
N = 20
SEED = 42
beam = 20

In [None]:
import logging
import random
from math import exp
import numpy as np
from copy import copy, deepcopy
from typing import Callable
from gx_utils import *

logging.basicConfig(format="%(message)s", level=logging.INFO)

In [None]:
def problem(N, seed=None):
    random.seed(seed)
    return [
        list(set(random.randint(0, N - 1) for n in range(random.randint(N // 5, N // 2))))
        for n in range(random.randint(N, N * 5))
    ]

In [None]:
class State:
    def __init__(self, data: tuple):
        self._data = tuple(data)

    def __hash__(self):
        return hash(bytes(self._data[0]))

    def __eq__(self, other):
        return bytes(self._data[0]) == bytes(other._data[0])

    def __lt__(self, other):
        return bytes(self._data[0]) < bytes(other._data[0])

    def __str__(self):
        return str(self._data)

    def __repr__(self):
        return repr(self._data)

    @property
    def data(self):
        return self._data

    def copy_data(self):
        return (copy(self._data[0]), deepcopy(self._data[1]))

In [None]:
def search(
    initial_state: State,
    goal_test: Callable,
    parent_state: dict,
    state_cost: dict,
    priority_function: Callable,
    unit_cost: Callable,
    max_cost = None
):
    frontier = PriorityQueue()
    parent_state.clear()
    state_cost.clear()

    state = initial_state
    parent_state[state] = None
    state_cost[state] = 0

    while state is not None and not goal_test(state):
        for a in possible_actions(state):
            # Beam search
            if a > beam:
                continue
            new_state = result(state, a)
            cost = unit_cost(state, a)
            # limit diameter
            if max_cost and state_cost[state] + cost > max_cost:
                continue
            if new_state not in state_cost and new_state not in frontier:
                parent_state[new_state] = state
                state_cost[new_state] = state_cost[state] + cost
                frontier.push(new_state, p=priority_function(new_state))
                logging.debug(f"Added new node to frontier (cost={state_cost[new_state]})")
            elif new_state in frontier and state_cost[new_state] > state_cost[state] + cost:
                old_cost = state_cost[new_state]
                parent_state[new_state] = state
                state_cost[new_state] = state_cost[state] + cost
                logging.debug(f"Updated node cost in frontier: {old_cost} -> {state_cost[new_state]}")
        if frontier:
            state = frontier.pop()
        else:
            state = None

    path = list()
    s = state
    while s:
        path.append(s.copy_data())
        s = parent_state[s]

    logging.info(f"Found a solution with {len(path):,} lists; visited {len(state_cost):,} states")
    logging.info(f"number of elements in Ls: {state_cost[State(path[0])]}")
    return list(reversed(path))

In [None]:
GOAL = State((set(range(N)), None))

def goal_test(state: State):
    return state == GOAL
logging.info(f"Goal:\n{GOAL}")

In [None]:
def possible_actions(state: State):
    return (a for a in range(len(state._data[1])))

In [None]:
def result (state: State, index):
    occurence, _list = state.copy_data()
    sublist = _list.pop(index)
    occurence.update(sublist)
    return State((occurence, _list))


In [None]:
INITIAL_STATE = State((set(), problem(N, SEED)))
# logging.info(f"Goal:\n{INITIAL_STATE}")

## Breath-First

In [None]:
parent_state = dict()
state_cost = dict()

final = search(
    INITIAL_STATE,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: len(state_cost),
    unit_cost=lambda state, a: len(state._data[1][a]),
)


## Depth-First

In [None]:
parent_state = dict()
state_cost = dict()

final = search(
    INITIAL_STATE,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: -len(state_cost),
    unit_cost=lambda state, a: len(state._data[1][a]),
    # max_cost=200
)

## A*

In [None]:
parent_state = dict()
state_cost = dict()

def h(s):
    # possible numbers to be found
    return N - len(s._data[0])


final = search(
    INITIAL_STATE,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: (state_cost[s] + h(s)),
    unit_cost=lambda state, a: len(state._data[1][a]),
)