In [204]:
import numpy as np
import more_itertools as mit
import itertools as it
import networkx as nx
import json
import time

In [205]:
class Individual:
    def __init__(self, graph, partition):
        self.no_faults_award = 2
        self.graph = graph
        self.partition = partition
        self.num_of_colors = len(partition)
        self.coloring = self.create_coloring_data()
        self.num_of_faults = self.count_num_of_faults()
        self.fitness = self.fitness_function()
        
    def __str__(self):
        return str(self.partition)
        
    def __lt__(self, other):
         return self.fitness < other.fitness
        
        
    def create_coloring_data(self):
        coloring = {}
        for color, nodes_with_same_color in enumerate(self.partition):
            for node in nodes_with_same_color:
                coloring[node] = color
        return coloring
        
    def count_neighbors_with_same_color(self):
        count = 0
        for part in self.partition:
            for node in part:
                part_without_node = [p for p in part if p != node]
                if(not all(item in nx.non_neighbors(self.graph, node) for item in part_without_node)):
                    count += 1
        return count
    
    def pair_without_neighbor_nodes(self,pair):
        for node in pair[0]:
            if(any(item in pair[1] for item in nx.neighbors(self.graph, node))):
                return False
        for node in pair[1]:
            if(any(item in pair[0] for item in nx.neighbors(self.graph, node))):
                return False
        return True

    def count_colors_pairs_without_neighbor_nodes(self):
        count = 0
        if(self.num_of_colors == 1):
            return count
        for pair in it.combinations(self.partition, 2):
            if(self.pair_without_neighbor_nodes(pair)):
                count += 1
        return count
        
    def count_num_of_faults(self):
        return self.count_neighbors_with_same_color() + self.count_colors_pairs_without_neighbor_nodes()
        
    def fitness_function(self):
        if self.num_of_faults == 0:
            return self.num_of_colors * self.no_faults_award
        return self.num_of_colors/self.num_of_faults
    

In [206]:
def generate_init_population(graph, population_size):
    init_population = []
    for i in range (0, population_size):
        colors_num = np.random.randint(1, graph.number_of_nodes()+1)
        nodes = np.array(graph.nodes())
        np.random.shuffle(nodes)
        partition = np.array_split(nodes, colors_num)
        partition = list(map(lambda part: part.tolist(), partition))
        individual = Individual(graph, partition)
        init_population.append(individual)
    return init_population

In [207]:
def selection(population):
    return population[np.random.randint(len(population))]

In [208]:
def create_partition_from_coloring(coloring):
    partition = [[] for _ in range((max(coloring.values()) + 1))]
    for node, color in coloring.items():
        partition[color].append(node)
    return partition

In [209]:
def crossover(graph, parent_1, parent_2):
    crosspoints = np.random.randint(graph.number_of_nodes(), size=3)
    crosspoints.sort()
    child_1_coloring = {}
    child_2_coloring = {}
    for node in nx.nodes(graph):
        if int(node) < crosspoints[0]:
            child_1_coloring[node] = parent_1.coloring[node]
            child_2_coloring[node] = parent_2.coloring[node]
        if int(node) >= crosspoints[0] and int(node) < crosspoints[1]:
            child_1_coloring[node] = parent_2.coloring[node]
            child_2_coloring[node] = parent_1.coloring[node]
        if int(node) >= crosspoints[1] and int(node) < crosspoints[2]:
            child_1_coloring[node] = parent_1.coloring[node]
            child_2_coloring[node] = parent_2.coloring[node]
        if int(node) >= crosspoints[2] and int(node) < graph.number_of_nodes():
            child_1_coloring[node] = parent_2.coloring[node]
            child_2_coloring[node] = parent_1.coloring[node]
    child_1_partition = create_partition_from_coloring(child_1_coloring)
    child_2_partition = create_partition_from_coloring(child_2_coloring)
    child_1 = Individual(graph, child_1_partition)
    child_2 = Individual(graph, child_2_partition)
    return [child_1, child_2]
    
        
        

In [210]:
def mutation(individual, mutation_factor):
    coloring = individual.coloring
    mutated_individual = individual
    for node, color in coloring.items():
        if np.random.uniform() > mutation_factor:
            continue
        coloring[node] = np.random.randint(0, individual.num_of_colors)
        mutated_individual = Individual(individual.graph, create_partition_from_coloring(coloring))
    return mutated_individual
        

In [211]:
def random_part_split(individual):
    new_partition = []
    part_to_split = np.random.randint(0, individual.num_of_colors)
    num_of_iterations = 0 
    while len(individual.partition[part_to_split]) < 2 and num_of_iterations < 100:
        part_to_split = np.random.randint(0, individual.num_of_colors)
        num_of_iterations += 1
    if num_of_iterations == 100:
        return individual
    else:
        for index, part in enumerate(individual.partition):
            if index == part_to_split:
                parts = np.array_split(np.array(part), 2)
                new_partition.append(parts[0].tolist())
                new_partition.append(parts[1].tolist())
            else:
                new_partition.append(part)
    return Individual(individual.graph, new_partition)
            

In [212]:
def random_color_swap(individual):
    new_coloring = individual.coloring
    selected_nodes = np.random.randint(0, individual.graph.number_of_nodes(), 2)
    selected_nodes = list(map(str, selected_nodes))
    new_coloring[selected_nodes[0]] = individual.coloring[selected_nodes[1]]
    new_coloring[selected_nodes[1]] = individual.coloring[selected_nodes[0]]
    return Individual(individual.graph, create_partition_from_coloring(new_coloring))

In [213]:
def simulated_annealing(individual, annealing_factor):
    annealed_individual = individual
    for i in range(0, annealing_factor):
        control_parameter = 1.0/(i+1)**0.5
        
        if individual.num_of_faults == 0:
            potential_annealed_individual = random_part_split(annealed_individual)
        else:
            potential_annealed_individual = random_color_swap(annealed_individual)
        
        if potential_annealed_individual.fitness > annealed_individual.fitness:
            annealed_individual = potential_annealed_individual
        elif control_parameter > np.random.uniform():
            annealed_individual = potential_annealed_individual
            
                    
    return annealed_individual

In [214]:
def start_evolution(graph, population_size, num_of_iterations, mutation_factor, annealing_factor, optimal_solution):
    population = generate_init_population(graph, population_size)
    new_population = population.copy()
    num_of_executed_iterations = 0
    for iteration in range(0, num_of_iterations):
        population.sort(reverse=True)
        for individual in population:
            if individual.num_of_faults == 0 and individual.num_of_colors == optimal_solution:
                return [population, num_of_executed_iterations]
        for i in range(0, int(population_size/4)):
            new_population[i] = population[i]
        for i in range(int(population_size/4), population_size-1, 2):
            parent_1 = selection(population)
            parent_2 = selection(population)
            [child_1, child_2] = crossover(graph, parent_1, parent_2)
            child_1 = mutation(child_1, mutation_factor)
            child_2 = mutation(child_2, mutation_factor)
            new_population[i] = child_1
            new_population[i+1] = child_2
        new_population[0] = simulated_annealing(new_population[0], annealing_factor)
        new_population[population_size-1] = simulated_annealing(new_population[population_size-1], annealing_factor)
        population = new_population.copy()
        num_of_executed_iterations += 1
    population.sort(reverse=True)
    return [population, num_of_executed_iterations]

            
        

In [217]:
def record_test_data():
    np.random.seed()
    with open('test_results/achromatic_brute_force.json', 'r') as fp:
        brute_force_test_results = json.load(fp)

    test_data = {}
    for n in range(2, 15):
        for v in range(1,6):
            file_name = 'random_graphs/random_graph_n' + str(n) + '_v' + str(v) + '.gml';
            graph = nx.read_gml(file_name)
            optimal_solution = brute_force_test_results[file_name]['achromatic_number']
            start_time = time.time()
            [final_population, num_of_executed_iterations] = start_evolution(
                graph = graph,
                population_size = 150,
                num_of_iterations = 5000,
                mutation_factor = 0.05,
                annealing_factor = 100,
                optimal_solution = optimal_solution)
            end_time = time.time()
            execution_duration = end_time - start_time
            result = final_population[0]
            for individual in final_population:
                if individual.num_of_faults == 0:
                    result = individual
                    break
            valid_solution = result.num_of_faults == 0
            is_optimal_solution = result.num_of_colors == optimal_solution
            test_data[file_name] = {'nodes_num': graph.number_of_nodes(),
                                    'edges_num': graph.number_of_edges(),
                                    'achromatic_number': optimal_solution,
                                    'result': result.num_of_colors,
                                    'is_optimal': is_optimal_solution,
                                    'is_valid': valid_solution,
                                    'iterations_num': num_of_executed_iterations,
                                    'execution_duration': end_time - start_time}
            with open('test_results/achromatic_genetic_with_annealing.json', 'w') as fp:
                json.dump(test_data, fp)


In [218]:
record_test_data()