In [None]:
import os
import time
import numpy as np
import random
from copy import deepcopy
from time import time
from itertools import combinations
import copy
import argparse
import pandas as pd
np.random.seed(42)
random.seed(42)


def load_instance(
    source: str ="/home/jdnunes/Documents/Heuristics&MetaHeuristics/SCP-Instances",
    instance: str ='scpd5.txt'
):
    with open(os.path.join(source, instance)) as f:
        data = f.readlines()
        
    return data 


def redundancy_elimination(solution, sets, current_cost, costs):
        # list of redundant subsets
        redundant = []
        # iterate over the set of subsets
        for i, set1 in enumerate(list(solution.keys())):
            # compare current position with all others
            flags = [False for elem in solution[set1]]
            for j, set2 in enumerate(list(solution.keys())[i+1:]):
                for k, elem in enumerate(solution[set1]):
                    if elem in sets[set2]:
                        flags[k]=True
            # if all elements of subset1 covered in other subsets, append subset1 to redundant
            if all(flags)==True:
                redundant.append(set1)
        for set_ in redundant:
            # remove the redundant subsets from the sets
            solution.pop(set_)
            # update the cost
            current_cost-=costs[int(set_)-1]

        return solution, current_cost

In [None]:
def process_data(data):  
    # first line : nattr, nsets
    for i, elem in enumerate(data):
        data[i] = [int(e) for e in elem.strip().split(" ")]

    # second line: costs
    costs = []
    for i, elem in enumerate(data[1:]):
        if len(elem)==1 and len(data[i+2])>1:
            break
        costs.extend(elem)
    
    del data[1:i+1]
    data.insert(1, costs)

    # following lines (MxK), with M=nattr, and K=num sets that cover attribute m: nsets, list(sets)

    # 1: store de row indices that indicate nsets
    indices=[]
    for j, line in enumerate(data[2:-1]):
        idx = j+2
        if len(line)==1 and len(data[idx+1])>1:
            # a line with 1 element is only an index if the next line contains more than 1 element
            # otherwise, it could still be a set (e.g. 25 (12+12+1))
            indices.append(idx)
    # 2: iterate over the sets:
    sets_temp = []
    indices.append(len(data))
    for i, idx in enumerate(indices[:-1]):
        s = []
        for j, elem in enumerate(data[idx+1:indices[i+1]]):
            s.extend(elem)
        sets_temp.append(s)
    
    sets = {}
    
    for i, set_list in enumerate(sets_temp):
        attr = i+1
        for set_ in set_list:
            if str(set_) in list(sets.keys()):
                sets[str(set_)].append(attr)
            else:
                sets[str(set_)] = [attr]
    
    del data[2:]
    data.extend(sets)
    m=data[0][0]
    universe = list(range(1, m+1))
    costs =  data[1]
    return data, universe, sets, costs


In [None]:
# Assignment 2: Improvement Heuristic for the Set Covering Problem (SCP)
import sys

def ih_best_neighbour(solution, universe, sets, cost, costs, k=10, topk=True):
    
    # do best neighbour local_search while cost is improving
    improving = True
    if topk:
        topk_costs = [(cost-i) for i in range(k)]
        topk_sols = [solution for _ in range(k)]
    while improving:
        improving = False
        # list of set pairs to swap
        keys = list(solution.keys())
        aggregated_sets = [comb for comb in combinations(keys, 2)]

        new_solution = copy.deepcopy(solution)
        # initialize best_cost
        best_cost = copy.deepcopy(cost)
        
        for j, pair_of_sets in enumerate(aggregated_sets):
            # remove next k subsets fom solution
            reduced_solution = copy.deepcopy(solution)
            previous_cost = 0 # cost of the two sets that will be removed
            for j in range(2):
                reduced_solution.pop(pair_of_sets[j])
                previous_cost+=costs[int(pair_of_sets[j])-1]
            # assess attributes not covered in reduced_solution
            uncovered = copy.deepcopy(universe)
            for k,v in reduced_solution.items():
                for elem in v:
                    if elem in uncovered:
                        uncovered.remove(elem)
            # solve the reduced set covering
            # select sets that cover the uncovered attributes
            covered = []
            sub_solution = dict()
            sub_cost=0
            search_sets = dict()
            # reduce search space for the sets that cover the still uncovered attributes
            for k, v in sets.items():
                if any(item in v for item in uncovered):
                    search_sets[k]=v
            # assess every feasible solution for the reduced set size covering problem
            
            # limit the neighborhood to all feasible solutions that add 2 sets to the reduced size set covering problem
            all_set_combs = [comb for comb in combinations(search_sets.keys(), 2)]
            # all_set_combs.extend([comb for comb in combinations(search_sets.keys(), 3)])
            feasible_set_combs = []
            for comb in all_set_combs:
                # check if is a feasible solution
                attlist = [search_sets[k] for k in comb]
                covered_attributes = [v for vlist in attlist for v in vlist]
                if all(item in covered_attributes for item in uncovered):
                    feasible_set_combs.append(comb) # append to feasible set combs
                    
            for comb in feasible_set_combs:
                sub_cost = 0
                for k in comb:
                    sub_cost+=costs[int(k)-1]
                curr_cost = cost - previous_cost + sub_cost
                if curr_cost < best_cost:
                    new_solution = copy.deepcopy(reduced_solution)
                    s = {k:search_sets[k] for k in comb}
                    new_solution.update(s)
                    best_cost = curr_cost
                    new_solution, best_cost = redundancy_elimination(new_solution, sets, best_cost, costs)
                    del topk_sols[topk_costs.index(max(topk_costs))]
                    topk_costs.remove(max(topk_costs))
                    topk_costs.append(best_cost)
                    topk_sols.append(new_solution)
                    improving = True
           
        solution = copy.deepcopy(new_solution)
        cost = copy.deepcopy(best_cost)
    if topk:
        return solution, list(solution.keys()), cost, topk_sols, topk_costs
    return solution, list(solution.keys()), cost


def ih_first_neighbour(solution, universe, sets, cost, costs, K=2):
    
    # do best neighbour local_search while cost is improving
    improving = True
    while improving:
        improving = False
        # list of set pairs to swap
        keys = list(solution.keys())
        aggregated_sets = [comb for comb in combinations(keys, 2)]

        new_solution = copy.deepcopy(solution)
        # initialize best_cost
        best_cost = copy.deepcopy(cost)
        
        for j, pair_of_sets in enumerate(aggregated_sets):
            if improving:
                break
            # remove next k subsets fom solution
            reduced_solution = copy.deepcopy(solution)
            previous_cost = 0 # cost of the two sets that will be removed
            for j in range(2):
                reduced_solution.pop(pair_of_sets[j])
                previous_cost+=costs[int(pair_of_sets[j])-1]
            # assess attributes not covered in reduced_solution
            uncovered = copy.deepcopy(universe)
            for k,v in reduced_solution.items():
                for elem in v:
                    if elem in uncovered:
                        uncovered.remove(elem)
            # solve the reduced set covering
            # select sets that cover the uncovered attributes
            covered = []
            sub_solution = dict()
            sub_cost=0
            search_sets = dict()
            # reduce search space for the sets that cover the still uncovered attributes
            for k, v in sets.items():
                if any(item in v for item in uncovered):
                    search_sets[k]=v
            # assess every feasible solution for the reduced set size covering problem
            
            # limit the neighborhood to all feasible solutions that add 2 sets to the reduced size set covering problem
            all_set_combs = [comb for comb in combinations(search_sets.keys(), 2)]
            # all_set_combs.extend([comb for comb in combinations(search_sets.keys(), 3)])
            feasible_set_combs = []
            for comb in all_set_combs:
                # check if is a feasible solution
                attlist = [search_sets[k] for k in comb]
                covered_attributes = [v for vlist in attlist for v in vlist]
                if all(item in covered_attributes for item in uncovered):
                    feasible_set_combs.append(comb) # append to feasible set combs
                    
            for comb in feasible_set_combs:
                sub_cost = 0
                for k in comb:
                    sub_cost+=costs[int(k)-1]
                curr_cost = cost - previous_cost + sub_cost
                if curr_cost < best_cost:
                    new_solution = copy.deepcopy(reduced_solution)
                    s = {k:search_sets[k] for k in comb}
                    new_solution.update(s)
                    best_cost = curr_cost
                    new_solution, best_cost = redundancy_elimination(new_solution, sets, best_cost, costs)
                    improving = True
                    break
           
        solution = copy.deepcopy(new_solution)
        cost = copy.deepcopy(best_cost)
        
    return solution, list(solution.keys()), cost

# Assignment 3: Escaping Local Optima

def geometric_cooling(t, alpha=0.99):
    return alpha*t


def simulated_annealing(
    solution,
    universe,
    sets,
    cost,
    costs,
    K=2,
    t=50,
    max_iter=10000,
):
    
    # do best neighbour local_search while cost is improving
    improving = True
    # count iterations
    
    step = 0
    for j in range(max_iter):
        if not improving:
            break
        improving = False
        # list of set pairs to swap
        keys = list(solution.keys())
        aggregated_sets = [comb for comb in combinations(keys, 2)]

        new_solution = copy.deepcopy(solution)
        # initialize best_cost
        best_cost = copy.deepcopy(cost)
        
        for i, pair_of_sets in enumerate(aggregated_sets):
            if improving:
                break
            # remove next k subsets fom solution
            reduced_solution = copy.deepcopy(solution)
            previous_cost = 0 # cost of the two sets that will be removed
            for j in range(2):
                reduced_solution.pop(pair_of_sets[j])
                previous_cost+=costs[int(pair_of_sets[j])-1]
            # assess attributes not covered in reduced_solution
            uncovered = copy.deepcopy(universe)
            for k,v in reduced_solution.items():
                for elem in v:
                    if elem in uncovered:
                        uncovered.remove(elem)
            # solve the reduced set covering
            # select sets that cover the uncovered attributes
            covered = []
            sub_solution = dict()
            sub_cost=0
            search_sets = dict()
            # reduce search space for the sets that cover the still uncovered attributes
            for k, v in sets.items():
                if any(item in v for item in uncovered):
                    search_sets[k]=v
            # assess every feasible solution for the reduced set size covering problem
            
            # limit the neighborhood to all feasible solutions that add 2 sets to the reduced size set covering problem
            all_set_combs = [comb for comb in combinations(search_sets.keys(), 2)]
            # all_set_combs.extend([comb for comb in combinations(search_sets.keys(), 3)])
            feasible_set_combs = []
            for comb in all_set_combs:
                # check if is a feasible solution
                attlist = [search_sets[k] for k in comb]
                covered_attributes = [v for vlist in attlist for v in vlist]
                if all(item in covered_attributes for item in uncovered):
                    feasible_set_combs.append(comb) # append to feasible set combs
                    
            for comb in feasible_set_combs:
                sub_cost = 0
                for k in comb:
                    sub_cost+=costs[int(k)-1]
                curr_cost = cost - previous_cost + sub_cost
                p = np.exp(-((curr_cost-best_cost)/t))
                if (curr_cost < best_cost) or (np.random.random() < p):
                    new_solution = copy.deepcopy(reduced_solution)
                    s = {k:search_sets[k] for k in comb}
                    new_solution.update(s)
                    best_cost = curr_cost
                    new_solution, best_cost = redundancy_elimination(new_solution, sets, best_cost, costs)
                    improving = True 
                step+=1
                # update the temperature parameter every 2 steps
                if (step%2)==0:
                    t = geometric_cooling(t)
        solution = copy.deepcopy(new_solution)
        cost = copy.deepcopy(best_cost)
        
    return solution, list(solution.keys()), cost

In [None]:
def print_avg_pc_deviation(heuristic, parent_dir, out, instances):
    
    f1 = "/home/jdnunes/Documents/Heuristics&MetaHeuristics//Assignment 2//results//scp_best.txt"
    f2 = os.path.join(parent_dir,"results",out+"-"+ heuristic+".txt")
    df1 = pd.read_csv(f1, sep=" ", header=None, names=["id", "cost"])
    df1 = df1.loc[df1['id'].isin(instances)]
    df2 = pd.read_csv(f2, sep=" ", header=None, names=["id", "cost"])
    c1 = np.array(list(df1["cost"]), dtype=np.float32)
    c2 = np.array(list(df2["cost"]), dtype=np.float32)
    pc_dev = ((c2 / c1) - 1)*100
    avg_pc_deviation = np.mean(pc_dev)
    for i, inst in enumerate(instances):
        with open(os.path.join(parent_dir,"results",out+"-"+ heuristic+"-pc-dev.txt"), "a") as f:
            f.write(str(inst)+' '+str(pc_dev[i])+'\n')
    print(f"Average percent deviation from best known solutions ({heuristic}): {avg_pc_deviation:.2f} %")

def ttest(parent_dir, groups):
    
    f1 = os.path.join(parent_dir,"results",groups[0]+".txt")
    f2 = os.path.join(parent_dir,"results",groups[1]+".txt") 
    
    df1 = pd.read_csv(f1, sep=" ", header=None, names=["id", "cost"])
    df2 = pd.read_csv(f2, sep=" ", header=None, names=["id", "cost"])
    
    a = df1["cost"].to_numpy()
    b = df2["cost"].to_numpy()
    _, pval = scipy.stats.ttest_ind(a, b)
    
    return pval


def options():
   
    parser = argparse.ArgumentParser()
    parser.add_argument("--parent_dir", type=str, default="/home/jdnunes/Documents/Heuristics&MetaHeuristics/Assignment 2")
    parser.add_argument("--out", type=str, default="scp-ch4")
    parser.set_defaults()
    args, unknown = parser.parse_known_args()
    return args


def main():
    
    instances = [
    "4.2", "4.3", "4.4", "4.5", "4.6", "4.7", "4.8", "4.9",
    "5.1", "5.2", "5.3", "5.4", "5.5", "5.6", "5.7", "5.8", "5.9",
    "6.1", "6.2", "6.3", "6.4", "6.5",
    "A.1", "A.2", "A.3", "A.4", "A.5",
    "B.1", "B.2", "B.3", "B.4", "B.5",
    "C.1", "C.2", "C.3", "C.4", "C.5",
    "D.1", "D.2", "D.3", "D.4", "D.5",
    ]
    args = options()
    
    start = time()

    bi_elapsed_time = 0
    ch_elapsed_time = 0
    fi_elapsed_time = 0
    sa_elapsed_time = 0
    for inst in instances:
        
        instance = 'scp'+''.join(inst.split('.')).lower() +'.txt'
        
        data = load_instance(instance=instance)
        
        _, universe, sets, costs = process_data(data)
        
        # initial constructive heuristic (incumbent solution)
        ch_start = time()
        init_solution, _, cost = ch4(universe, sets, costs, True)
        ch_end = time()
        ch_elapsed_time += (ch_end - ch_start)
        
        ## improvement heuristic (best neighbour)
        bi_start = time()
        bi_solution, _, bi_cost, topk_sols, topk_costs = ih_best_neighbour(init_solution, universe, sets, cost, costs)
        bi_end = time()
        bi_elapsed_time += (bi_end - bi_start)
        
        
        # simulated_annealing
        sa_start = time()
        best_sa_cost = bi_cost
        # GRASP: restart simulated annealing k=10 times and keep the solution with the best cost
        for i, sol in enumerate(topk_sols):
            sa_solution, _, sa_cost = simulated_annealing(sol, universe, sets, topk_costs[i], costs)
            if sa_cost < best_sa_cost:
                best_sa_cost = sa_cost
                best_sa_solution = sa_solution
        
        sa_end = time()
        sa_elapsed_time += (sa_end - sa_start)
        # improvement heuristic (first neighbour)
        #fi_start = time()
        #fi_solution,_,fi_cost = ih_first_neighbour(init_solution, universe, sets, cost, costs)
        #fi_end = time()
        #fi_elapsed_time += (fi_end - fi_start)
        #with open(os.path.join(args.parent_dir,"results",args.out+".txt"), "a") as f:
        #       f.write(str(inst)+' '+str(cost)+'\n')
        #with open(os.path.join(args.parent_dir,"results",args.out+"-fi.txt"), "a") as f:
        #       f.write(str(inst)+' '+str(fi_cost)+'\n')
        with open(os.path.join(args.parent_dir,"results",args.out+"-simulated-annealing.txt"), "a") as f:
               f.write(str(inst)+' '+str(best_sa_cost)+'\n')
    end = time()
    print(f"Finished! Total elapsed time: {end - start:.3f} s")
    print(f"Constructive heuristic elapsed time (total): {ch_elapsed_time:.3f} s")
    #print(f"First improvement elapsed time (total): {fi_elapsed_time:.3f} s")
    print(f"Simulated Annealing elapsed time (total): {sa_elapsed_time:.3f} s")
    
    print(f"Constructive heuristic elapsed time (average): {ch_elapsed_time/len(instances):.3f} s")
    # print(f"First improvement elapsed time (average): {fi_elapsed_time/len(instances):.3f} s")
    print(f"Simulated Annealing elapsed time (average): {sa_elapsed_time/len(instances):.3f} s")
    
    # print_avg_pc_deviation("fi", args.parent_dir, args.out, instances)
    print_avg_pc_deviation("simulated-annealing", args.parent_dir, args.out, instances)
    
    
if __name__ == '__main__':
    main()


In [None]:
import os
import pandas as pd
import numpy as np
def print_avg_pc_deviation(heuristic, parent_dir, out, instances):
    
    f1 = "/home/jdnunes/Documents/Heuristics&MetaHeuristics//Assignment 2//results//scp_best.txt"
    f2 = os.path.join(parent_dir,"results",out+"-"+ heuristic+".txt")
    df1 = pd.read_csv(f1, sep=" ", header=None, names=["id", "cost"])
    df1 = df1.loc[df1['id'].isin(instances)]
    df2 = pd.read_csv(f2, sep=" ", header=None, names=["id", "cost"])
    c1 = np.array(list(df1["cost"]), dtype=np.float32)
    c2 = np.array(list(df2["cost"]), dtype=np.float32)
    pc_dev = ((c2 / c1) - 1)*100
    avg_pc_deviation = np.mean(pc_dev)
    for i, inst in enumerate(instances):
        with open(os.path.join(parent_dir,"results",out+"-"+ heuristic+"-pc-dev.txt"), "a") as f:
            f.write(str(inst)+' '+str(pc_dev[i])+'\n')
    print(f"Average percent deviation from best known solutions ({heuristic}): {avg_pc_deviation:.2f} %")

instances = [
    "4.2", "4.3", "4.4", "4.5", "4.6", "4.7", "4.8", "4.9",
    "5.1", "5.2", "5.3", "5.4", "5.5", "5.6", "5.7", "5.8", "5.9",
    "6.1", "6.2", "6.3", "6.4", "6.5",
    "A.1", "A.2", "A.3", "A.4", "A.5",
    "B.1", "B.2"
    ]

parent_dir = "/home/jdnunes/Documents/Heuristics&MetaHeuristics/Assignment 2"
out = "scp-ch4"
print_avg_pc_deviation("simulated-annealing",parent_dir, out, instances)