# Coevolutionary algorithm
with **GEP** and **Differential Evolution**

The basic idea of a coevolutionary algorithm is as follows: several populations evolve simultaneously, each of which optimizes a given function and has its own optimization strategy. At the same time, the populations "fight" for a resource, which is redistributed in favor of the more effective of them during the operation of the algorithm.

## Params of run
- **Shared resource $N_s$**. The resource $N_s$ is the number of calculations of the objective function - the product of the population size and the number of iterations. Initially, the resource is assumed to be common and is equally divided between each population.
- **Length of the adaptation interval $L_i$**. Setting the adaptation interval value during a certain number of steps (the so-called adaptation interval) each algorithm works separately. The adaptation interval value is determined and set by the researcher. Naturally, its value should not be small (the algorithms will not have time to show themselves) and large (there will be little time for adaptation)
- **Amount of penalty for loss $P$**. The penalty is a certain percentage of the population size of the individual algorithm by which we will reduce the population size of the losing algorithms. Naturally, its value should not be too small (the algorithms will not feel the changes) or large (the search procedure for an algorithm with a small population size becomes meaningless).
- **Size of the "social" card $C_s$**. "Social card" is a certain percentage of the population size of an individual algorithm, which is the minimum guaranteed population size. That is, we can reduce the losing population only until it reaches the minimum guaranteed size - "social card" (the necessity of this limitation has been proven in practice).

## Evaluation of algorithms
Since the coevolutionary algorithm is based on competing strategies of algorithms, a fitness function must be introduced for subpopulations. With the help of this function, the best population is determined and given more opportunities for reproduction.

$\Large q_i=\sum_{k=0}^{L_i-1}\frac{L_i-k}{k+1} \cdot b_i(k)$

where $k=0$ means the current situation, $k=1$ means previous situation, etc; $i$ means index of population; $b_i(k)=1$ if the $i$-th population at time $k$ contains the best (among all populations) individual.

In the future, the obtained assessments of individual algorithms included in the coevolution are the main criterion for assessing the performance of algorithms and are used in the process of resource redistribution.

## Efficiency of the coevolutionary algorithm
The following indicators were used as comparison criteria: reliability $R$ and speed $V$. Reliability $R$ is the number (in percentage) of successful launches of the algorithm out of the total number of launches. Speed $V$ is estimated by the generation number in which the algorithm finds a solution with a given accuracy.

## Run Experiment

### Init params

In [None]:
# shared resource
SHARED_RESOURCE = 10000

# length of the adaptation interval
ADAPTATION_LENGTH = 25

# amount of penalty for loss
PENALTY = 0.05 # means 15% of population

# size of population
POPULATION_SIZE = 100

# size of the "social" card
SOCIAL_SIZE = 0.1 * POPULATION_SIZE # means 10% of population

# set the seed
SEED = 321

# storage for input regexes params
PARAMS = {}

# storages for test dataset
X, Y = [], []

# number of iterations of performance metric
N_ITER = 100

### Prepare Coevolutionary Manager

In [None]:
from typing import Callable

class CoevolutionaryManager:
    def __init__(
        self,
        verbose: bool = False,
    ):
        self.shared_resources = SHARED_RESOURCE
        self.adaptive_length = ADAPTATION_LENGTH
        self.penalty = PENALTY
        self.social_size = int(SOCIAL_SIZE)
        self.verbose = verbose
        
        self.algorithm_dict = {}
        self.current_winner = None
    
    def register_algorithm(self, algorithm_name):
        self.algorithm_dict[algorithm_name] = {
            'individuals': [],
            'population_number': 0,
            'status': 'stop',
            'score': 0,
            'population_size': POPULATION_SIZE,
            'validation_func': lambda _: True
        }
        
    def add_individual(
        self, 
        algorithm_name,
        individual_str: str,
        individual_metric: float
    ):
        self.algorithm_dict[algorithm_name]['individuals'].append(
            (individual_str, individual_metric)
        )
        
    def run_population(self, algorithm_name):
        self.algorithm_dict[algorithm_name]['status'] = 'run'
        self.algorithm_dict[algorithm_name]['population_number'] += 1
        
    def set_validation_function(self, algorithm_name, validation_func):
        self.algorithm_dict[algorithm_name]['validation_func'] = validation_func
        
    def get_population_score(self, algorithm_name):
        ### set the winner
        return 1
        
    def stop_population(self, algorithm_name):
        self.algorithm_dict[algorithm_name]['status'] = 'stop'
        self.algorithm_dict[algorithm_name]['score'] = self.get_population_score(algorithm_name)
        
    def is_adaptive_population(self, algorithm_name):
        return not self.algorithm_dict[algorithm_name]['population_number'] >= self.adaptive_length
    
    def get_resource(self):
        if self.shared_resources:
            self.shared_resources -= 1
            if self.shared_resources % 1000 == 0:
                print(f'--- Remaining resource: {self.shared_resources}')
            return True
        else:
            return False
        
    def get_population(self, population, algorithm_name):
        if self.current_winner == algorithm_name:
            new_population = []
            for el in population:
                if self.algorithm_dict[algorithm_name].get('validation_func')(el):
                    new_population.append(el)
        else:
            new_population = []
            population_length = len(population)
            current_length = self.algorithm_dict[algorithm_name]['population_size'] * (1 - self.penalty)
            current_length = int(current_length)
            if current_length < self.social_size:
                current_length = self.social_size
            self.algorithm_dict[algorithm_name]['population_size'] = current_length
            print(f'--- Current length of population: <{current_length}>')
            for i, el in enumerate(population):
                if len(new_population) == current_length:
                    el = self.algorithm_dict[algorithm_name]['validation_func'](el)
                new_population.append(el)
        return new_population
    
    def __repr__(self):
        return '''
        1) <register_algorithm> // register algorithm
            for each population:
            2) <is_adaptive_population> // check nessary to run coevolutionary
            3) <run_population> // send signal about population starting
            if coevolutionary start:
                4) <get_population> // filter population after previous population
                    for each individual:
                    5) <get_resource> // get resource for evaluate individual
                        if resource is available:
                            6) <add_individual> // register individual and his metric
                7) <stop_population> // send signal about population stop
        '''

### Metrics

In [None]:
import time


class Metrics:
    @staticmethod
    def get_match_accuracy(regex, phrase, result):
        try:
            reg_result = regex.match(phrase).group()
            if reg_result == result:
                return 1
            return 0
        except AttributeError:
            return 0
    
    @staticmethod
    def get_performance_metric(regex, n_iter, test_strings):
        t0 = time.time() * 1000
        for _ in range(n_iter):
            for test_string in test_strings:
                _ = regex.match(test_string)
        return ((time.time() * 1000 - t0) / n_iter) / len(test_strings)    

### Expression Tree to Regular Expression Translator

In [None]:
class ETtoRegexTranslator:
    @staticmethod
    def lexical_analyzer(string):
        sep_symbols = ['(', ')', ',']
        tokens = []
        _current_token = ''
        for symbol in string:
            if symbol in sep_symbols:
                if _current_token != '':
                    tokens.append(_current_token)
                tokens.append(symbol)
                _current_token = ''
            else:
                _current_token += symbol
        return tokens

    @staticmethod
    def generator(tokens, current_state=None):
        global PARAMS
        result = []
        repeat = False
        for i, token in enumerate(tokens):
            if token in TERMINALS:
                # work with special terminals
                if token == 'escape':
                    result.append('\\')
                elif token == 'any':
                    result.append('.')
                elif token == 'range':
                    result.append(f'[{random.choice(PARAMS["range"])}]')
                else:
                    result.append(token)
            # add arguments
            elif token == ',' and current_state == 'alt':
                result.append('|')
            elif token == ')' and repeat is True and current_state is None:
                result.append(f'{"{"}{random.choice(PARAMS["repeat"])}{"}"}')
                repeat = False
            elif token == '(' and current_state in ['group', 'alt']:
                result.append(token)
            elif token == ')':
                current_state = None
                result.append(')')
            # check functions
            elif token == 'group':
                current_state = 'group'
            elif token == 'repeat':
                repeat = True
            elif token == 'alt':
                current_state = 'alt'
        return result
    
    @staticmethod
    def regex_compile(individual):
        individual = re.sub(
            r'\s', 
            '', 
            str(individual)
        )
        tokens = ETtoRegexTranslator.lexical_analyzer(individual)
        regex = ''.join(
            ETtoRegexTranslator.generator(tokens)
        ) + ')'
        try:
            return re.compile(regex), None
        except Exception as e:
            return None, f'error: {e}, string: {regex}'

### Incidence List to Regular Expression Translator

In [None]:
import re
from enum import Enum


class NodesTypes(Enum):
    params = -1
    seq = 0
    atom = 1
    any = 2
    repeat = 3
    alt = 4
    altgroup = 5
    group = 6
    range = 7
    escape = 8

class Nodes:
    nodes_types = {}
    max_id = 0

    def __init__(self):
        for node_type in NodesTypes.__members__:
            value = NodesTypes.__members__.get(node_type).value
            self.nodes_types[value] = node_type
            self.max_id = value

class ILtoRegexTranslator:
    def __init__(self, incidence_list, nodes, params):
        self.params = params
        self.incidence_list = incidence_list
        self.nodes = nodes

    @staticmethod
    def preprocess_incidence_list(incidence_list):
        previous_incidence = incidence_list[0]
        new_incidence_list = [previous_incidence[0], previous_incidence[1]]

        for incidence in incidence_list[1:]:
            if incidence[0] == incidence[1] == 0:
                new_incidence_list.append(-1)
                continue
            if previous_incidence[1] == incidence[0]:
                new_incidence_list.append(incidence[1])
                previous_incidence = incidence
                continue
            else:
                new_incidence_list.append(incidence[0])
                new_incidence_list.append(incidence[1])
                previous_incidence = incidence

        return new_incidence_list
    
    def get_regex(self, incidence_list):

        regex = ''

        is_alt = False
        is_group = False
        is_repeat = False

        for incidence in incidence_list:
            if incidence[0] == 0 and is_group:
                regex += ')'
            if incidence[0] == 3:
                continue
            match incidence[1]:
                case 0:
                    if incidence[0] == 0:
                        continue
                case 2:
                    regex += '.'
                case 3:
                    if not is_repeat:
                        is_repeat = True
                    continue
                case 5:
                    if is_alt:
                        regex += '|'
                        is_alt = False
                    else:
                        is_alt = True
                case 6:
                    regex += '('
                    is_group = True
                case 7:
                    _params = self.params.get('range')[0]
                    regex += f'[{_params}]'
                case 8:
                    regex += '\\'
                case _:
                    if incidence[0] in [1, 8]:
                        regex += self.nodes[incidence[1]]
            if is_repeat:
                _params = self.params.get('repeat')[0]
                regex += f'{_params}'
                is_repeat = False
        return regex
    
    def regex_compile(self, individual):
        try:
            return re.compile(self.get_regex(individual)), None
        except Exception as e:
            return None, e

### GEP class

In [None]:
import deap
import random
import warnings


def _validate_basic_toolbox(tb):
    assert hasattr(tb, 'select'), "The toolbox must have a 'select' operator."
    # whether the ops in .pbs are all registered
    for op in tb.pbs:
        assert op.startswith('mut') or op.startswith('cx'), \
        "Operators must start with 'mut' or 'cx' except selection."
        assert hasattr(tb, op), "Probability for a operator called '{}' \
        is specified, but this operator is not " \
                                "registered in the toolbox.".format(op)
    # whether all the mut_ and cx_ operators have their probabilities assigned in .pbs
    for op in [attr for attr in dir(tb) if attr.startswith('mut') or attr.startswith('cx')]:
        if op not in tb.pbs:
            warnings.warn('{0} is registered, \
            but its probability is NOT assigned in Toolbox.pbs. '
                          'By default, \
                          the probability is ZERO and the operator {0} will NOT be applied.'.format(op),
                          category=UserWarning)


def _apply_modification(population, operator, pb):
    for i in range(len(population)):
        if random.random() < pb:
            population[i], = operator(population[i])
            del population[i].fitness.values
    return population


def _apply_crossover(population, operator, pb):
    for i in range(1, len(population), 2):
        if random.random() < pb:
            population[i - 1], population[i] = operator(population[i - 1], population[i])
            del population[i - 1].fitness.values
            del population[i].fitness.values
    return population


def gep_simple(population, toolbox, n_generations=100, n_elites=1,
               stats=None, hall_of_fame=None, verbose=__debug__):
    
    def validate_ind(ind):
        try:
            ind.fitness.valid = True
        except:
            pass
        return ind
    
    ## for coevolutionary (step 1)
    ce_manager.register_algorithm('gep')
    ce_manager.set_validation_function(
        algorithm_name='gep',
        validation_func=validate_ind
    )
    ##
    
    _validate_basic_toolbox(toolbox)
    logbook = deap.tools.Logbook()
    logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])
    
    ## for coevolutionary (step 2)
    is_adaptive_population = True
    ##

    for gen in range(n_generations + 1):
        
        ## for coevolutionary (step 2)
        is_adaptive_population = ce_manager.is_adaptive_population('gep')
        ##
        
        ## for coevolutionary (step 3)
        ce_manager.run_population('gep')
        ##
        
        ## for coevolutionary (step 4)
        if not is_adaptive_population:
            invalid_individuals = ce_manager.get_population(
                population=population,
                algorithm_name='gep'
            )
            invalid_individuals = [ind for ind in invalid_individuals if not ind.fitness.valid]
        else:
            invalid_individuals = [ind for ind in population if not ind.fitness.valid]
        ##       
        
        print(f'--- Size of population: <{len(invalid_individuals)}>')
        
        ind_errors = 0
        ind_calculates = 0
        is_end = False
        for ind in invalid_individuals:
            ind.fitness.values = toolbox.evaluate(ind)
            ## for coevolutionary (step 5)
            if ce_manager.get_resource():
                try:
                    ## for coevolutionary (step 6)
                    ce_manager.add_individual(
                        algorithm_name='gep',
                        individual_str=ind,
                        individual_metric=ind.fitness.values
                    )
                    ind_calculates += 1
                except Exception as e:
                    ind_errors += 1
            else:
                is_end = True
                break
            ##
        
        if is_end:
            break
        
        if ind_errors:
            print(f'Errors percent while fitness calculations: '
                  f'{round(ind_errors/(ind_errors+ind_calculates),3)*100}%')

        # record statistics and log
        if hall_of_fame is not None:
            hall_of_fame.update(population)
        record = stats.compile(population) if stats else {}
        logbook.record(gen=gen, nevals=len(invalid_individuals), **record)
        if verbose:
            print(logbook.stream)

        if gen == n_generations:
            break

        # selection with elitism
        elites = deap.tools.selBest(population, k=n_elites)
        offspring = toolbox.select(population, len(population) - n_elites)

        # replication
        offspring = [toolbox.clone(ind) for ind in offspring]

        # mutation
        for op in toolbox.pbs:
            if op.startswith('mut'):
                offspring = _apply_modification(offspring, getattr(toolbox, op), toolbox.pbs[op])

        # crossover
        for op in toolbox.pbs:
            if op.startswith('cx'):
                offspring = _apply_crossover(offspring, getattr(toolbox, op), toolbox.pbs[op])

        # replace the current population with the offsprings
        population = elites + offspring
        
        ## for coevolutionary (step 7)
        ce_manager.stop_population('gep')
        ##
        
    return population, logbook

In [None]:
import re
import random
from typing import List, Dict, Callable

import exrex
import geppy
import numpy as np
from deap import creator, base, tools


def regex_process(string, regex):
    re_comp = re.compile(regex)
    try:
        return re_comp.match(string).group()
    except AttributeError:
        return None


class GEPAlgorithm:
    
    _statistics_dict = {
        "avg": np.mean,
        "std": np.std,
        "min": np.min,
        "max": np.max
    }
    
    _seed = SEED
    _verbose = True
    
    def __init__(
        self,
        head_n: int,
        genes_n: int,
        terminals: List,
        functions: Dict,
        evaluate_function: Callable,
        population_size: int,
        population_number: int,
        n_elites: int,
    ):
        # init entities for algorithm
        self.terminals = terminals
        self.functions = functions
        
        # init GEP variables
        self.head_n = head_n
        self.genes_n = genes_n
        
        # test strings for check accuracy
        self.test_strings = []
        
        # X and Y for test dataset
        self.X = []
        self.Y = []
        
        # set evaluate function
        self.evaluate_function = evaluate_function
        
        # set the evolution params
        self.population_size = population_size
        self.population_number = population_number
        self.n_elites = n_elites
             
    @staticmethod
    def get_test_strings(
        input_regex: str,
        n_fuzzy_strings: int,
    ):
        return list(
            exrex.generate(
                input_regex, 
                limit=n_fuzzy_strings)
        )
    
    @staticmethod
    def create_training_set(
        test_strings: List,
        original_regex: str,
        process_func: Callable
    ):
        Y = []
        X = []
        for string in test_strings:
            X.append(string)
            Y.append(
                process_func(
                    string=string, 
                    regex=original_regex
                )
            )
        return X, Y
    
    def create_primitives_set(self):
        # terminals
        pset = geppy.PrimitiveSet('Main', input_names=self.terminals)

        # functions
        for function in self.functions.keys():
            pset.add_function(function, self.functions.get(function))
        return pset
    
    @staticmethod
    def create_individual():
        # to minimize the objective (fitness)
        creator.create("FitnessMin", base.Fitness, weights=(-1,))
        creator.create("Individual", geppy.Chromosome, fitness=creator.FitnessMin)
        return creator
    
    @staticmethod
    def create_popiulation(
        pset,
        creator,
        head_length: int = 5,
        genes_number_in_chromosome: int = 2,
    ):
        print('--- Set the formula for individual: '
              f'head: <{head_length}> and '
              f'gene in chromosome: <{genes_number_in_chromosome}>')
        toolbox = geppy.Toolbox()
        toolbox.register('gene_gen', geppy.Gene, pset=pset, head_length=head_length)
        toolbox.register(
            'individual', 
            creator.Individual, 
            gene_gen=toolbox.gene_gen, 
            n_genes=genes_number_in_chromosome,
            linker=group
        )
        toolbox.register("population", tools.initRepeat, list, toolbox.individual)
        toolbox.register('compile', geppy.compile_, pset=pset)
        return toolbox
    
    def set_evalute_function(self, toolbox):
        toolbox.register('evaluate', self.evaluate_function)
        return toolbox
        
    def set_genetic_operators(self, toolbox, pset):
        toolbox.register('select', tools.selRoulette)
        ## general mutations whose aliases start with 'mut'
        # We can specify the probability for an operator with the .pbs property
        toolbox.register(
            'mut_uniform', 
            geppy.mutate_uniform, 
            pset=pset, 
            ind_pb=2 / (2 * self.head_n + 1)
        )
        toolbox.pbs['mut_uniform'] = 0.3
        # Alternatively, assign the probability along with registration using the pb keyword argument.
        toolbox.register('mut_invert', geppy.invert, pb=0.1)
        toolbox.register('mut_is_ts', geppy.is_transpose, pb=0.1)
        toolbox.register('mut_ris_ts', geppy.ris_transpose, pb=0.1)
        toolbox.register('mut_gene_ts', geppy.gene_transpose, pb=0.4)

        ## general crossover whose aliases start with 'cx'
        toolbox.register('cx_1p', geppy.crossover_one_point, pb=0.1)
        toolbox.pbs['cx_1p'] = 0.4   # just show that the probability can be overwritten
        toolbox.register('cx_2p', geppy.crossover_two_point, pb=0.2)
        toolbox.register('cx_gene', geppy.crossover_gene, pb=0.1)
        
        return toolbox
        
    def set_statistic(self):
        stats = tools.Statistics(key=lambda ind: ind.fitness.values[0])
        for statistic_name in self._statistics_dict.keys():
            stats.register(
                statistic_name, 
                self._statistics_dict[statistic_name]
            )
        return stats
    
    def launch_evolution(
        self,
        toolbox,
        stats
    ):
        random.seed(self._seed)

        pop = toolbox.population(
            n=self.population_size
        )
        # only record the best individual ever found in all generations
        hof = tools.HallOfFame(1)

        # start evolution
        pop, log = gep_simple(
            pop, 
            toolbox,
            n_generations=self.population_number,
            n_elites=self.n_elites,
            stats=stats,
            hall_of_fame=hof, 
            verbose=self._verbose
        )

        best = hof[0]
        return best
    
    def __call__(
        self, 
        input_regex: str,
        n_fuzzy_strings: int = 5,
    ):
        print('GEP algorithm starts...')
        print(f'--- Get regex: <{input_regex}>')
        self.test_strings = self.get_test_strings(
            input_regex=input_regex,
            n_fuzzy_strings=n_fuzzy_strings
        )
        
        print(f'--- Get test strings: <{len(self.test_strings)}>')
        
        self.X, self.Y = self.create_training_set(
            test_strings=self.test_strings,
            original_regex=input_regex,
            process_func=regex_process,
        )
        
        global X, Y
        X = self.X
        Y = self.Y
        
        pset = self.create_primitives_set()
        
        creator = self.create_individual()
        
        toolbox = self.create_popiulation(
            pset=pset,
            creator=creator,
            head_length=self.head_n,
            genes_number_in_chromosome=self.genes_n,
        )
        
        toolbox = self.set_evalute_function(toolbox)
        
        toolbox = self.set_genetic_operators(
            toolbox=toolbox, 
            pset=pset,
        )
        
        stats = self.set_statistic()
        
        return self.launch_evolution(
            toolbox=toolbox,
            stats=stats,
        )

#### Prepare terminals

In [None]:
import unicodedata
from geppy.core.symbol import _is_nonkeyword_identifier

def get_all_unicode_letters(start_code, stop_code):
    start_idx, stop_idx = [int(code, 16) for code in (start_code, stop_code)]
    characters = []
    for unicode_idx in range(start_idx, stop_idx + 1):
        characters.append(chr(unicode_idx))
    return characters

TERMINALS = []

# Latin upper
TERMINALS += get_all_unicode_letters('0041', '005A')

# Latin lower
TERMINALS += get_all_unicode_letters('0061', '007A')

# digits
TERMINALS += get_all_unicode_letters('0030', '0039')

# check ability to get __name__
correct_terminals = []

for x in TERMINALS:
    try:
        assert _is_nonkeyword_identifier(x)
        correct_terminals.append(x)
    except Exception as e:
        pass
    
TERMINALS = correct_terminals

# range group and escape in this work will be terminals
TERMINALS += ['range', 'any', 'escape']

#### Prepare functions

In [None]:
# dummy functions for naming
def alt(*args):
    return args
    
def group(*args):
    return args

def repeat(*args):
    return args

# Key - function object
# Value - arity of function
FUNCTIONS = {
    alt: 2,
    group: 10,
    repeat: 1,
}

#### Prepare evaluation function

In [None]:
def gep_evaluate(individual):
    global X, Y
    regex = ETtoRegexTranslator.regex_compile(individual)[0]
    if not regex:
        return 1_000,
    
    # count accuracy metric
    accuracy = []
    for x, y in zip(X, Y):
        accuracy.append(
            Metrics.get_match_accuracy(
                regex=regex, 
                phrase=x, 
                result=y
            )
        )
    try:
        accuracy = sum(accuracy)/len(accuracy)
        res_metric = float(
            Metrics.get_performance_metric(
                regex=regex,
                n_iter=N_ITER,
                test_strings=X
        )) / accuracy
    except ZeroDivisionError:
        res_metric = 1_000

    return res_metric,

#### prepare visualization of best

In [None]:
from IPython.display import Image

rename_labels = {
    'group': 'group', 
    'alt': 'alt', 
    'repeat': 'repeat', 
    'escape': 'escape', 
    '_range': 'range'
}

def visualize(ind):
    geppy.export_expression_tree(ind, rename_labels, 'data/coevolutionary_tree.png')

### DE class

In [None]:
import re
import array
import random
from typing import List, Dict, Callable

import exrex
import numpy as np
from deap import creator, base, tools


def regex_process(string, regex):
    re_comp = re.compile(regex)
    try:
        return re_comp.match(string).group()
    except AttributeError:
        return None


class DEAlgorithm:
    
    _statistics_dict = {
        "avg": np.mean,
        "std": np.std,
        "min": np.min,
        "max": np.max
    }
    
    _de_dict = {
        'cr': 0.6,
        'f': 0.4,
        'mu': 300,
    }
    
    _verbose = True
    _seed = SEED
    
    def __init__(
        self,
        bounds,
        ndim,
        evaluate_function: Callable,
        population_size: int,
        population_number: int,
    ):
        # de params
        self.bounds = bounds
        self.ndim = ndim
        
        # evolution params
        self.population_size = population_size
        self.population_number = population_number
        
        # test strings for check accuracy
        self.test_strings = []
        
        # X and Y for test dataset
        self.X = []
        self.Y = []
        
        # evaluate function
        self.evaluate_function = evaluate_function
        
    @staticmethod
    def get_test_strings(
        input_regex: str,
        n_fuzzy_strings: int,
    ):
        return list(
            exrex.generate(
                input_regex, 
                limit=n_fuzzy_strings)
        )
        
    @staticmethod
    def create_training_set(
        test_strings: List,
        original_regex: str,
        process_func: Callable
    ):
        Y = []
        X = []
        for string in test_strings:
            X.append(string)
            Y.append(
                process_func(
                    string=string, 
                    regex=original_regex
                )
            )
        return X, Y
    
    def set_genetic_operators(self):
        
        toolbox = base.Toolbox()
        toolbox.register("attr_float", random.uniform, -3, 3)
        toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_float, self.ndim)
        toolbox.register("population", tools.initRepeat, list, toolbox.individual)
        toolbox.register("select", tools.selRandom, k=3)
        
        return toolbox
        
    @staticmethod
    def create_individual():
        # to minimize the objective (fitness)
        creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
        creator.create("Individual", array.array, typecode='d', fitness=creator.FitnessMin)
        return creator
    
    def set_evalute_function(self, toolbox):
        toolbox.register('evaluate', self.evaluate_function)
        return toolbox
    
    def set_statistic(self):
        stats = tools.Statistics(key=lambda ind: ind.fitness.values[0])
        for statistic_name in self._statistics_dict.keys():
            stats.register(
                statistic_name, 
                self._statistics_dict[statistic_name]
            )
        return stats
    
    def create_popiulation(self, toolbox):
        pop = toolbox.population(n=self._de_dict['mu']);
        return pop
    
    def launch_evolution(
        self,
        toolbox,
        stats,
        pop
    ):
        random.seed(self._seed)

        logbook = tools.Logbook()
        logbook.header = "gen", "evals", "std", "min", "avg", "max"

        # Evaluate the individuals
        fitnesses = toolbox.map(toolbox.evaluate, pop)
        for ind, fit in zip(pop, fitnesses):
            ind.fitness.values = fit
            
        hof = tools.HallOfFame(1)

        record = stats.compile(pop)
        logbook.record(gen=0, evals=len(pop), **record)
        print(logbook.stream)

        for g in range(1, self.population_number):
            for k, agent in enumerate(pop):
                a,b,c = toolbox.select(pop)
                y = toolbox.clone(agent)
                index = random.randrange(self.ndim)
                for i, value in enumerate(agent):
                    if i == index or random.random() < self._de_dict['cr']:
                        y[i] = a[i] + self._de_dict['f']*(b[i]-c[i])
                y.fitness.values = toolbox.evaluate(y)
                if y.fitness > agent.fitness:
                    pop[k] = y
            hof.update(pop)
            record = stats.compile(pop)
            logbook.record(gen=g, evals=len(pop), **record)
            print(logbook.stream)

        return hof[0]
    
    def __call__(
        self,
        input_regex: str,
        n_fuzzy_strings: int = 5,
    ):
        print('#de# DE algorithm starts...')
        print(f'#de# Get regex: <{input_regex}>')
        self.test_strings = self.get_test_strings(
            input_regex=input_regex,
            n_fuzzy_strings=n_fuzzy_strings
        )
        
        print(f'#de# Get test strings: <{len(self.test_strings)}>')
        
        self.X, self.Y = self.create_training_set(
            test_strings=self.test_strings,
            original_regex=input_regex,
            process_func=regex_process,
        )
        
        global X, Y
        X = self.X
        Y = self.Y
        
        creator = self.create_individual()
        
        toolbox = self.set_genetic_operators()
        
        pop = self.create_popiulation(toolbox)
        
        toolbox = self.set_evalute_function(toolbox)
        
        stats = self.set_statistic()
        
        return self.launch_evolution(
            toolbox=toolbox,
            stats=stats,
            pop=pop,
        )

#### Prepare evaluation function

In [None]:
_translator = None


def de_evaluate(individual):
    
    individual = np.array(
        [abs(x) for x in individual.tolist()]
    ).round().reshape((15 * 1, 2))
    
    global X, Y
    
    res = _translator.regex_compile(individual)
    regex = res[0]
    if not regex:
        return 1_000,
    
    # count accuracy metric
    accuracy = []
    for x, y in zip(X, Y):
        accuracy.append(
            Metrics.get_match_accuracy(
                regex=regex, 
                phrase=x, 
                result=y
            )
        )
    try:
        accuracy = sum(accuracy)/len(accuracy)
        res_metric = float(
            Metrics.get_performance_metric(
                regex=regex,
                n_iter=N_ITER,
                test_strings=X
        )) / accuracy
    except ZeroDivisionError:
        res_metric = 1_000

    return res_metric,

### Run

In [None]:
def clean_globals():
    global PARAMS, X, Y
    X, Y = [], []
    PARAMS = {}  
    
def set_globals():
    global PARAMS, NODES, _translator
    PARAMS = {
        'range': ['0-9'],
        'repeat': ['0,1']
    }
    
    NODES = {
        # functions (except any)
        -1: 'params', 
        0: 'seq', 
        1: 'atom', 
        2: 'any', 
        3: 'repeat', 
        4: 'alt', 
        5: 'altgroup', 
        6: 'group', 
        7: 'range', 
        8: 'escape', 
        
        # terminals (get by input regex)
        9: 'a', 
        10: 'b', 
        11: 'c', 
        12: '['
    }

    _translator = ILtoRegexTranslator(
        incidence_list = [(0, 4), (4, 5), (5, 1), (1, 9), (5, 3), (3, 1), (1, 10), (5, 1), (1, 11), (5, 2), (4, 5), (5, 7), (5, 8), (8, 12)],
        nodes = NODES,
        params = PARAMS
    )

#### GEP

In [None]:
set_globals()
ce_manager = CoevolutionaryManager()

gep = GEPAlgorithm(
    terminals=TERMINALS,
    functions=FUNCTIONS,
    head_n=5,
    genes_n=2,
    evaluate_function=gep_evaluate,
    population_size=POPULATION_SIZE,
    population_number=1_000, # stub value due to limited resources
    n_elites=2
)

best = gep(
    input_regex='ab?c.|[0-9]\['
)

#### see the evaluate metric for best by GEP

In [None]:
ETtoRegexTranslator.regex_compile(best)

In [None]:
gep_evaluate(best)

#### see the strucutre of best by GEP

In [None]:
visualize(best)
Image(filename='data/coevolutionary_tree.png')

In [None]:
clean_globals()

#### DE

In [None]:
set_globals()
# ce_manager = CoevolutionaryManager()

de = DEAlgorithm(
    bounds=[0, 11],
    ndim=15 * 2,
    evaluate_function=de_evaluate,
    population_size=POPULATION_SIZE,
    population_number=10, # stub value due to limited resources
)

best = de(
    input_regex='ab?c.|[0-9]\['
)

#### see the evaluate metric for best by DE

In [None]:
de_evaluate(best)

#### see the strucutre of best by DE

In [None]:
best = np.array(
        [abs(x) for x in best.tolist()]
    ).round().reshape((15 * 1, 2))

_translator.regex_compile(best)