In [295]:
import random
import numpy as np
from numpy.typing import NDArray
import logging
from dataclasses import dataclass
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")

In [296]:
DEBUG = False
logging.getLogger().setLevel(logging.DEBUG if DEBUG else logging.INFO)

In [297]:
from typing import Optional


def tournament(population, k=2):
    return max(random.choices(list(population.keys()), k=k), key=lambda x: x.fitness)

class Individual:
    P: Optional[NDArray]
    N: Optional[int]

    def __init__(self, *, included_lists: NDArray, P: NDArray, N: int):
        self.included_lists = included_lists
        self._len = sum(map(lambda x: len(x), Individual.P[np.where(self.included_lists == 1)[0]]))
        self.covered_numbers = self._compute_coveredNumbers()
        self.isValid = self.covered_numbers.any()
        self.isGoal = (self.covered_numbers == 1).all()
        self.bloat = (len(self) - N)
        self.fitness = self._compute_fitness()

    def __repr__(self):
        return str(self.included_lists)

    def __len__(self):
        return self._len
    
    def __hash__(self):
        return hash(bytes(self.included_lists))

    def __matmul__(self, other):
        '''Performs crossover between self and other'''
        assert (Individual.P == other.P).all(), "Two invididuals must belong to the same problem!"
        assert Individual.N == other.N, "Two invididuals must belong to the same problem!"
        index = np.random.randint(len(Individual.P))
        new_included_lists = np.hstack([self.included_lists[:index], other.included_lists[index:]])
        return Individual(included_lists=new_included_lists, P=Individual.P, N=Individual.N)

    def __invert__(self):
        new_included_lists = self.included_lists[:]
        indexes = np.random.choice([True, False], p=[0.7, 0.3], size=len(new_included_lists))
        new_included_lists[indexes] = 1 - new_included_lists[indexes]
        return Individual(included_lists=new_included_lists, P=Individual.P, N=Individual.N)

    def _list_to_binary(self, list: NDArray):
        binary_mapping = np.zeros(Individual.N)
        binary_mapping[list] = 1
        return binary_mapping

    def _compute_coveredNumbers(self):
        covered_numbers = np.zeros(Individual.N)
        for i in np.where(self.included_lists == 1)[0]:
            binary_mapping = self._list_to_binary(Individual.P[i])
            covered_numbers = np.logical_or(covered_numbers, binary_mapping).astype(np.int32)
        return covered_numbers

    def _compute_fitness(self):
        individual_metrics = np.array([sum(self.covered_numbers), self.bloat])
        individual_objective = np.array([Individual.N, 0])
        distance = np.sqrt(np.sum((individual_metrics - individual_objective)**2))
        return -distance

    @classmethod
    def init(cls, P, N):
        cls.P = P
        cls.N = N

In [298]:
def problem(N, seed=None):
    random.seed(seed)
    return [
        list(set(random.randint(0, N - 1) for n in range(random.randint(N // 5, N // 2))))
        for n in range(random.randint(N, N * 5))
    ]

In [299]:
SEED = 42
MAX_COUNT_GENERATION = 100
MAX_PATIENCE = 5

In [300]:
for N in [20, 100, 500]:
    P = np.array(problem(N, seed=42), dtype=object)
    logging.info(f"Starting with N: {N}, len(P): {sum(map(lambda x: len(x), P))}")
    Individual.init(P, N)
    POPULATION_SIZE = max(N*5, 5000)
    OFFSPRING_SIZE = POPULATION_SIZE*4
    population = dict()

    old_fittest = None
    fittest = None

    for genome in [np.fromiter((np.random.choice([1, 0], p=[0.1, 0.9]) for _ in P), dtype=np.int8) for _ in range(POPULATION_SIZE)]:
        individual = Individual(included_lists=genome, P=P, N=N)
        if individual.isValid and individual not in population:
            # adding individual in population, the dict is used to leverage the fast in (not in) operator and to have just one copy of the same individual
            population[individual] = None
            # keeping track of fittest individual discovered so far
            if fittest is None or individual.fitness > fittest.fitness:
                fittest = individual
                # initialziation of old_fittest
                old_fittest = fittest

    logging.info(f"Starting fittest, fitness: {fittest.fitness:.2f}, W: {len(fittest)}, goal: {fittest.isGoal}")
    count_generation = 0
    patience = 0
    while count_generation < MAX_COUNT_GENERATION and patience < MAX_PATIENCE:
        count_generation += 1
        logging.info(f"----------- [Generation: {count_generation}/{MAX_COUNT_GENERATION}] -----------")
        offspring = dict()
        for _ in range(OFFSPRING_SIZE):
            p1, p2 = tournament(population=population, k=POPULATION_SIZE//3), tournament(population=population, k=POPULATION_SIZE//3)
            o = p1 @ p2 # crossover
            if np.random.rand() < .6:
                o = ~o # mutation
            # add generated individual into population (strategy 2 GA)
            if o.isValid:
                population[o] = None
        
        # poplation as a sorted list to keep the population size constant
        population = sorted(population.keys(), key=lambda x: x.fitness, reverse=True)[:POPULATION_SIZE]

        old_fittest = fittest
        if population[0].fitness > fittest.fitness:
            fittest = population[0]
            logging.info(f"New fittest found. Fitness: {fittest.fitness:.2f}, W: {len(fittest)}, goal: {fittest.isGoal}")

        if old_fittest == fittest:
            logging.info(f"No improvement, patience {patience}/{MAX_PATIENCE}")
            patience +=1
        
        population = {k: None for k in population}

    logging.info(f"N={N}, W={len(fittest)}, bloat={fittest.bloat/N*100:.2f}%, {fittest.isGoal}")

2022-11-06 13:43:15,707 INFO: Starting with N: 5, len(P): 32
2022-11-06 13:43:17,395 INFO: Starting fittest, fitness: -0.00, W: 5, goal: True
2022-11-06 13:43:17,396 INFO: ----------- [Generation: 1/100] -----------


KeyboardInterrupt: 