In [76]:
import random
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np
from steinlib.instance import SteinlibInstance 
from steinlib.parser import SteinlibParser

In [80]:
class MySteinlibInstance(SteinlibInstance): 
    def __init__(self):
        self.my_graph = nx.Graph() 
        self.terms = []
        
    def terminals__t(self, line, converted_token): 
        self.terms.append(converted_token[0])
        
    def graph__e(self, line, converted_token):
        e_start = converted_token [0]
        e_end   = converted_token [1]
        weight  = converted_token [2]
        self.my_graph.add_edge(e_start , e_end , weight=weight)

In [82]:
def load_B04 ():
    stein_file = "data/B/b04.stp" 
    my_class = MySteinlibInstance() 
    with open(stein_file) as my_file:
        my_parser = SteinlibParser(my_file , my_class) 
        my_parser.parse()
        terms = my_class.terms
        graph = my_class.my_graph
    return graph , terms


In [2]:
def create_graphs(n,s,model,**kwargs):
    Graphs = []
    while len(Graphs) < n:
        G = model(s,**kwargs)
        while nx.is_connected(G) is False:
            G = model(s,**kwargs)
        Graphs.append(G)
    return Graphs

In [3]:
def lower_bound_optimal_solution(G,terms):
    shortest_path_bound = 0
    for i in range(len(terms)):
        for j in range(i + 1, len(terms)):
            path_length = nx.shortest_path_length(G, source=terms[i], target=terms[j], weight='weight')
            shortest_path_bound = max(shortest_path_bound, path_length)
    return shortest_path_bound

In [4]:
def average_lower_bound(graph_series,terms):
    total_bound = 0
    for G in graph_series:
        lb = lower_bound_optimal_solution(G, terms)
        total_bound += lb
    average_bound = total_bound / len(graph_series)
    return average_bound

In [12]:
def eval_sol(graph, terminals, sol) -> float:
    graph_sol = nx.Graph()
    for (i, j) in sol:
        if (i, j) in graph_sol.edges:
            graph_sol[i][j]['weight'] = 1 + graph_sol[i][j].get('weight', 1)
        else:
            graph_sol.add_edge(i, j, weight=graph[i][j].get('weight', 1))

    if not nx.is_tree(graph_sol):
        print ("Error: the proposed solution is not a tree")
        return -1

    for i in terminals:
        if not i in graph_sol:
            print ("Error: a terminal is missing from the solution")
            return -1

    cost = graph_sol.size(weight='weight')
    return cost

In [32]:
def two_approximation(G, terms):
    # Step 1: Create the complete graph K using shortest paths between all terminal pairs
    K = nx.Graph()
    shortest_paths = {}
    for u in terms:
        for v in terms:
            if u != v:
                length, path = nx.single_source_dijkstra(G, u, target=v)
                K.add_edge(u, v, weight=length)
                shortest_paths[(u, v)] = path

    # Step 2: Compute the Minimum Spanning Tree A from graph K
    A = nx.minimum_spanning_tree(K, algorithm="kruskal")

    # Step 3: Unfold shortest path edges in G for each edge (u, v) in A
    steiner_edges = []
    for u, v in A.edges():
        path = shortest_paths[(u, v)]
        
        for i in range(len(path) - 1):
            steiner_edges.append((path[i], path[i + 1]))

    return steiner_edges

In [48]:
def generate_initial_steiner_tree(G, terminals):
    steiner_edges = two_approximation(G, terminals)
    steiner_tree = nx.Graph()
    steiner_tree.add_edges_from(steiner_edges)
    return steiner_tree


In [40]:
def prune_tree(tree, terminals):
    nodes_to_remove = [node for node in tree.nodes()
                       if tree.degree(node) == 1 and node not in terminals]
    while nodes_to_remove:
        tree.remove_nodes_from(nodes_to_remove)
        nodes_to_remove = [node for node in tree.nodes()
                           if tree.degree(node) == 1 and node not in terminals]
    return tree

In [68]:
def rand_neighbor(G, current_solution, terminals):
    new_solution = current_solution.copy()
    
    operation = random.choice(["add_node", "remove_edge", "swap_edge", "prune_leaf"])
    if operation == "add_node":
        non_tree_nodes = set(G.nodes) - set(new_solution.nodes)
        if non_tree_nodes:
            new_node = random.choice(list(non_tree_nodes))
            neighbor_nodes = [n for n in new_solution.nodes if G.has_edge(n, new_node)]
            if neighbor_nodes:
                neighbor = random.choice(neighbor_nodes)
                weight = G[neighbor][new_node].get('weight', 1)
                new_solution.add_edge(neighbor, new_node, weight=weight)

    elif operation == "remove_edge":
        removable_edges = [(u, v) for u, v in new_solution.edges()
                           if new_solution.degree(u) > 1 and new_solution.degree(v) > 1
                           and u not in terminals and v not in terminals]
        if removable_edges:
            edge_to_remove = random.choice(removable_edges)
            new_solution.remove_edge(*edge_to_remove)
            if not nx.is_connected(new_solution):
                new_solution.add_edge(*edge_to_remove)

    elif operation == "swap_edge":
        u, v = random.choice(list(new_solution.edges()))
        new_solution.remove_edge(u, v)
        try:
            path = nx.shortest_path(G, source=u, target=v, weight='weight')
            nx.add_path(new_solution, path)
        except nx.NetworkXNoPath:
            weight = G[u][v].get('weight', 1)
            new_solution.add_edge(u, v, weight=weight)
            
    elif operation == "prune_leaf":
        new_solution = prune_tree(new_solution, terminals)
        
    if nx.is_tree(new_solution) and all(term in new_solution.nodes for term in terminals):
        return new_solution
    else:
        return current_solution

In [69]:
def simulated_annealing(G, terms, T_init=None, T_end=1e-3, max_iterations=1000):
    if T_init is None:
        T_init = 10 * len(G.edges())
    current_solution = generate_initial_steiner_tree(G, terms)
    if current_solution is None:
        raise ValueError("Failed to generate a valid initial Steiner tree.")
        
    current_weight = eval_sol(G, terms, current_solution.edges())
    best_solution = current_solution.copy()
    best_weight = current_weight
    iteration = 0
    accept_count = 0
    T = T_init
    while iteration < max_iterations and T > T_end:
        new_solution = rand_neighbor(G, current_solution, terms)    
        new_weight = eval_sol(G, terms, new_solution.edges())
        
        if new_weight < current_weight:
            proba = 1
        else:
            proba = np.exp((current_weight - new_weight) / T)
            
        if random.uniform(0, 1) < proba:
            current_solution = new_solution
            current_weight = new_weight
            accept_count += 1
            
            if current_weight < best_weight:
                best_solution = current_solution
                best_weight = current_weight
        
        iteration += 1
        T = T_init / (1 + iteration)
                    
        if iteration % 100 == 0:
            acceptance_ratio = accept_count / 100
            accept_count = 0
            if acceptance_ratio < 0.05:
                T *= 0.9  # Cool down faster if acceptance is low
            elif acceptance_ratio > 0.6:
                T *= 1.1  # Cool down slower if acceptance is high

    return best_solution, best_weight


In [70]:
def average_initial_solution(Graphs,f,terms):
    total_weight = 0
    for G in Graphs:
        weight = eval_sol(G,terms,f(G,terms))
        total_weight += weight
    average_weight = total_weight / len(Graphs)
    return average_weight

In [71]:
def average_solution(Graphs,terms,T_init=100,T_end=1,nb_iter_per_temp=50):
    total_weight = 0
    for G in Graphs: 
        _, weight = simulated_annealing(G, terms, T_init, T_end, nb_iter_per_temp)
        total_weight += weight
    average_weight = total_weight / len(Graphs)
    return average_weight

In [72]:
graph_series = create_graphs(50, 50, nx.generators.binomial_graph, p=0.5)
terminals = [0, 1, 2, 3, 4]
average_initial_weight = average_initial_solution(graph_series, two_approximation, terminals)
average_weight = average_solution(graph_series,terminals)
average_l_bound = average_lower_bound(graph_series, terminals)
print(f"Average Lower Bound: {average_l_bound}")
print(f"Average Initial Steiner Tree weight: {average_initial_weight}")
print(f"Average Steiner Tree weight: {average_weight}")

Average Lower Bound: 2.0
Average Initial Steiner Tree weight: 4.4
Average Steiner Tree weight: 4.36
