# 2. Programmierprojekt: Local Search

## $n$-Damen Problem

Wir modellieren das $n$-Damen Problem wie folgt: Jeder Zustand im Suchraum ist eine Permutation des Vektors $(0, \dotsc, n-1)$. Damit sind die Aktionen die von einem Zustand möglich sind Vertauschungen von Zahlen.

In [None]:
import random


def generate_random_conf(n: int) -> list:
    # Generate random permutation of (0,...,n-1)
    conf = [i for i in range(n)]
    random.shuffle(conf)
    return conf


def swap(conf, i, j):
    # Swap positions i,j inplace
    conf[i], conf[j] = conf[j], conf[i]

In [None]:
conf = generate_random_conf(8)
swap(conf, 3, 5)
conf

Implentieren Sie die folgende Funktion `conflicts`, welche für jede Dame die Anzahl der Bedrohungen aufsummiert. Überlegen Sie, welche Art von Bedrohungen durch die Modellierung überhaupt möglich sind.

In [None]:
def conflicts(queens) -> int:
    """ Heuristic that indicates number of beatable queens on the right """

    conflicts = 0
    for i in range(len(queens)):
        # For better initialization
        if queens[i] == float("-inf"):
            continue
        for j in range(i + 1, len(queens)):
            conflicts += int(queens[i] == queens[j] or queens[i] + j - i == queens[j] or queens[i] - j + i == queens[j])
    return conflicts

In [None]:
conf1 = [0, 1, 2, 3, 4, 5, 6, 7]  # conflicts = 28
conf2 = [17, 22, 11, 5, 2, 6, 12, 15, 21, 19, 10, 8, 0, 3, 1, 20, 23, 9, 14, 18, 13, 24, 16, 4, 7]  # conflicts = 0
conf3 = [15, 12, 9, 16, 0, 18, 6, 11, 19, 7, 13, 3, 10, 4, 1, 2, 8, 5, 14, 17]  # conflicts = 12

assert conflicts(conf1) == 28
assert conflicts(conf2) == 0
assert conflicts(conf3) == 12

Nutzen sie die obige Funktion als Heuristik für den A*-Algorithmus. Testen sie bis zu welchem $n$ der Algorithmus eine Lösung in unter zwei Minuten findet. Starten Sie dabei immer von einer zufälligen Startkonfiguration.

In [None]:
from queue import PriorityQueue


def queens_a_star(n: int) -> list[int]:
    """ Find solution with A*; not feasible for n >= 50 """

    conf = generate_random_conf(n)

    queue = PriorityQueue()
    # stores f, h, g, configuration
    queue.put((conflicts(conf), conflicts(conf), 0, conf))
    visited = []

    while not queue.empty():
        entry = queue.get()
        g_score, state = entry[2], entry[3]
        visited.append(state)

        for successor in successors_astar(state):
            if not conflicts(successor):
                return successor
            if successor in visited:
                continue

            new_g_score = g_score + 1
            h = conflicts(successor)
            f_score = h + new_g_score
            queue.put((f_score, h, new_g_score, successor))
            visited.append(successor)

    return [-1]


def successors_astar(queens) -> list[list[int]]:
    """ Returns list of successor states; action is swapping """
    successors = []
    for i in range(len(queens)):
        for j in range(i + 1, len(queens)):
            copy = queens.copy()
            swap(copy, i, j)
            successors.append(copy)
    return successors

Wir wollen im Folgenden einen Local-Search Ansatz nutzen, um das Problem zu lösen. Implementieren Sie nun den Hill-Climbing Algorithmus aus der Vorlesung. Der Algorithmus sollte zusätzlich maximal $k$ Seitwärtszüge erlauben.

In [None]:
def queens_hill_climb(n, k=0) -> tuple[list[int], int]:
    return hill_climb_core(generate_random_conf(n), k)


def hill_climb_core(conf, k=0) -> tuple[list[int], int]:
    """ Searches solution by hill-climbing; k: possible side steps """
    conflicts_amount = conflicts(conf)
    improvement, side_steps = 1, 0
    visited = []

    while side_steps <= k and conflicts_amount != 0:
        visited.append(conf)
        new_conf, new_conflict_amount = successors(conf, visited)

        improvement = conflicts_amount - new_conflict_amount

        if improvement:
            side_steps = 0
        else:
            side_steps += 1

        conf, conflicts_amount = new_conf, new_conflict_amount

    return conf, conflicts(conf)


def successors(queens, visited) -> list[list[int]]:
    """ Returns best successor"""
    best_successor = (queens, conflicts(queens))
    for i in range(len(queens)):
        for j in range(i + 1, len(queens)):
            copy = queens.copy()
            swap(copy, i, j)
            best_successor = (copy, conflicts(copy)) if (
                        conflicts(copy) <= best_successor[1] and copy not in visited) else best_successor
    return best_successor


Evaluieren Sie den Algorithmus empirisch. Testen sie für verschiedene $n$ und $k$, wie groß die Erfolgsrate des Algorithmus ist. Überlegen Sie sich eine geeignete Visualisierung ihrer Ergebnisse.

In [None]:
n_samples = 40
k = [0, 5, 10, 20, 50, 100]

total_amount, total_hits = 0, 0
for i in k:
    amount, hits = 0, 0
    for n in range(8, n_samples):
        amount += 1
        hits += int(not queens_hill_climb(n, i)[1])
    print("Success rate for k = " + str(i) + " : " + str((hits / amount) * 100) + "%")
    total_amount += amount
    total_hits += hits
print("Overall success rate: " + str((total_hits / total_amount) * 100) + "%")

Implementieren sie nun Hill-Climbing mit Random-Restarts, um immer optimale Lösungen zu finden. Berechnen Sie auch, wie viele Restarts nötig waren, um eine optimale Lösung zu finden. Bis zu welchem $n$ können sie mit diesem Ansatz in weniger als zwei Minuten Lösungen finden? (Probieren sie auch verschiedene $k$)

In [None]:
def queens_random_restart(n, k=0) -> tuple[list[int], int]:
    """ Restart hill climbing till solution is found; not feasible for n >= 65 """
    restart = 0
    conf, conflict_amount = queens_hill_climb(n, k)
    while conflict_amount:
        restart += 1
        conf, conflict_amount = queens_hill_climb(n, k)
    return conf, restart

Eine weitere Verbesserung kann erreicht werden, in dem die initiale Konfiguration nicht rein zufällig gewählt wird. Es kann versucht werden, anfangs eine Konfiguration zu finden, in der möglichst viele Damen bereits konfliktfrei gesetzt sind. Die restlichen Damen sollten dann mit möglichst wenigen Konflikten gesetzt werden.

Implementieren Sie darauf basierend einen verbesserten Generator für die Startkonfiguration und testen Sie, ob damit noch größere Probleminstanzen gelöst werden können.

In [None]:

import numpy as np


def better_initial_configuration(n: int, k=0) -> tuple[list[int], int]:
    """ Initialize conf with minimal conflicts """
    conf = [float("-inf")] * n
    possible_queens = [i for i in range(n)]
    conf[0] = possible_queens.pop(randrange(n))
    f = np.zeros((n, n), int)
    update(f, 0, conf[0])
    for i in range(1, n):
        copy = conf.copy()
        min_conflicts = possible_queens[0], float("inf")  # index, overlap_amount

        for j in possible_queens:
            f_c = np.copy(f)
            update(f_c, i, j)
            if f_c.sum() <= min_conflicts[1]:
                min_conflicts = j, f_c.sum()

        conf[i] = min_conflicts[0]
        update(f, i, conf[i])
        possible_queens.remove(conf[i])
    return hill_climb_core(conf, k)


def update(f, x, y):
    f[:, x] = 0
    f[y, :] = 0
    i, j = 1, 1

    while x + i < f.shape[0] and y + i < f.shape[0]:
        f[y + i, x + i] = 1
        i += 1

    while x + j < f.shape[0] and y - j > -1:
        f[y - j, x + j] = 1
        j += 1

In [None]:
n_samples = 40
k = [0, 5, 10, 20, 50, 100]

total_amount, total_hits = 0, 0
for i in k:
    amount, hits = 0, 0
    for n in range(8, n_samples):
        amount += 1
        hits += int(not better_initial_configuration(n, i)[1])
    print("Success rate for k = " + str(i) + " : " + str((hits / amount) * 100) + "%")
    total_amount += amount
    total_hits += hits
print("Overall success rate: " + str((total_hits / total_amount) * 100) + "%")

## Travelling-Salesman Problem

In diesem Teil soll das TSP mithilfe von Local Search approximiert werden. Für diese Aufgabe betrachten wir ausschließlich das symmetrische TSP, bei dem die Kanten der Graphen ungerichtet sind. Es gibt folglich für einen Graph nur eine Tour.

Wir verwenden zur Darstellung der Graphen das Paket `networkx` (https://networkx.org/documentation/latest/). Außerdem das Paket `tsplib95` (https://tsplib95.readthedocs.io/en/stable/index.html) um die Algorithmen mit Benchmarks zu testen.

In [None]:
import networkx as nx
import tsplib95
from random import randrange
from pathlib import Path
import math

Implementieren Sie die Funktion `import_benchmarks`, welche die verschiedenen TSP Instanzen zusammen mit den Lösungen aus dem Ordner `tsp_benchmarks` importiert und eine Liste aus Tupeln der Form `(G, optimal_solution)` zurückgibt, bestehend aus dem Graphen als `networkx`-Objekt und dem Gewicht der optimalen Lösung.

In [None]:
def import_benchmarks(path='./tsp_benchmarks') -> list[tuple[nx.Graph, int]]:
    tsp_list = []
    paths = Path(path).rglob('*.opt.tour')
    for filepath in paths:
        solution = tsplib95.load(str(filepath))
        problem = tsplib95.load(str(filepath)[:-8] + 'tsp')

        # large graphs are not feasible for our algorithms
        if len(list(problem.get_nodes())) > 70:
            continue

        try:
            optimal_weight = problem.trace_tours(solution.tours)[0]
            tsp_list.append((problem.get_graph(normalize=True), optimal_weight))
        except:
            pass
    return tsp_list


Implementieren Sie nun die folgenden drei Local-Search Algorithmen um das TSP zu lösen. Genau wie beim $n$-Damen Problem, kann auch beim TSP eine Lösung als Permutation der Knoten des Graphen gesehen werden. Für das Hill-Climbing und dem Simulated Annealing, besteht eine Aktion daraus, zwei Knoten auf dem Rundweg zu vertauschen. Beim EX3-Algorithmus ist die Aktion unten beschrieben.

**1. Simple Hill-Climbing**

In [None]:
def successors_hc(G, conf, visited) -> list[list[int]]:
    """ Returns best successor """
    best_successor = (conf, calculate_tour_cost(G, conf))
    for i in range(len(conf)):
        for j in range(len(conf)):
            if i == j:
                continue
            copy = conf.copy()
            swap(copy, i, j)
            best_successor = (copy, calculate_tour_cost(G, copy)) if (
                        calculate_tour_cost(G, copy) <= best_successor[1] and copy not in visited) else best_successor
    return best_successor


def tsp_hill_climb(G: nx.Graph, succ=successors_hc) -> int:
    """ Simple Hill Climbing """
    conf = list(G.nodes)
    tour_cost = calculate_tour_cost(G, conf)
    visited = []
    improvement = 1
    while improvement:
        visited.append(conf)
        new_conf, new_tour_cost = succ(G, conf, visited)
        improvement = tour_cost - new_tour_cost

        conf, tour_cost = new_conf, new_tour_cost
    return calculate_tour_cost(G, conf)


def calculate_tour_cost(G, t) -> int:
    """ Given tour add start to end, then return cost """
    total_cost = 0
    tour = t + [t[0]]
    for i in range(len(tour) - 1):
        u, v = tour[i], tour[i + 1]
        # Add the weight of the edge (u, v)
        total_cost += G[u][v]['weight']
    return total_cost

**3. Simulated Annealing**

In [None]:
def tsp_simulated_annealing(G: nx.Graph, temperature: float) -> int:
    """ Takes worse steps dependent on temperature """
    conf = list(G.nodes)
    tour_cost = calculate_tour_cost(G, conf)
    visited = []

    while temperature > 0:

        visited.append(conf)
        successor = conf.copy()
        i, j = randrange(len(conf)), randrange(len(conf))
        swap(successor, i, j)
        new_tour_cost = calculate_tour_cost(G, successor)

        delta = tour_cost - new_tour_cost
        if delta > 0:
            conf = successor
            tour_cost = new_tour_cost
        else:
            p = math.exp(delta / temperature)
            if random.random() <= p:
                conf = successor
                tour_cost = new_tour_cost

        temperature -= 0.1
    return tour_cost

**2. EX3-Algorithmus**

Der EX3-Algorithmus funktioniert ähnlich wie Hill-Climbing, benutzt allerdings eine etwas andere Nachfolgerfunktion. Die Nachfolger eines Rundweges werden bestimmt, indem zunächst zufällig **drei** Kanten aus dem bisherigen Rundweg ausgewählt werden. Dies zerlegt den Rundweg in drei Teilrundwege. Für das erneute zusammenfügen gibt es danach 8 Möglichkeiten (wie im Bild unten zu sehen ist). Der Algorithmus testet nun für jede 3 Kanten alle Nachfolger und wählt denjenigen aus, der die Kosten am stärksten verringert. Dies wird so lange gemacht, bis sich der Rundweg nicht mehr verkürzt.

![EX3 Visualisation](three_opt.png "EX3")

In [None]:
def successors_ex3(G, conf, visited) -> list[list[int]]:
    """ Returns best successor of ex3 """
    best_successor = (conf, calculate_tour_cost(G, conf))
    l = len(conf)
    n = sorted([randrange(l), randrange(l), randrange(l)])
    while n[1] - n[0] < 2 or n[2] - n[1] < 2:
        n = sorted([randrange(l), randrange(l), randrange(l)])
    x, y, z = n[0], n[1], n[2]
    copy = conf.copy()
    edge_z = copy[x:x + 2]
    edge_o = copy[y:y + 2]
    edge_t = copy[z:z + 2]

    z_to_o = copy[x + 2:y]
    o_to_t = copy[y + 2:z]
    t_to_z = copy[z + 2::] + copy[0:x]

    a = edge_z + z_to_o + edge_o + o_to_t + edge_t + t_to_z
    b = edge_z[::-1] + z_to_o + edge_o + o_to_t + edge_t + t_to_z
    c = edge_z + z_to_o + edge_o[::-1] + o_to_t + edge_t + t_to_z
    d = edge_z + z_to_o + edge_o + o_to_t + edge_t[::-1] + t_to_z
    e = edge_z[::-1] + z_to_o + edge_o[::-1] + o_to_t + edge_t + t_to_z
    f = edge_z[::-1] + z_to_o + edge_o + o_to_t + edge_t[::-1] + t_to_z
    g = edge_z + z_to_o + edge_o[::-1] + o_to_t + edge_t[::-1] + t_to_z
    h = edge_z[::-1] + z_to_o + edge_o[::-1] + o_to_t + edge_t[::-1] + t_to_z
    possible_successors = [a, b, c, d, e, f, g, h]

    for s in possible_successors:
        best_successor = (s, calculate_tour_cost(G, s)) if (
                    calculate_tour_cost(G, s) <= best_successor[1] and s not in visited) else best_successor
    return best_successor


def tsp_ex3(G: nx.Graph) -> int:
    return tsp_hill_climb(G, successors_ex3)


**Evaluierung**

Vergleichen Sie die drei Algorithmen miteinander. Nutzen Sie dafür die bereitgestellten Benchmarks und die Länge der Optimallösung. Sie sollten Ihre Ergebnisse geeignet visualisieren.

In [None]:
tsp_list = import_benchmarks()
average_deviation_hc, average_deviation_sa, average_deviation_ex = 0, 0, 0
for tsp in tsp_list:
    hc, sa, ex, opt = tsp_hill_climb(tsp[0]), tsp_simulated_annealing(tsp[0], 1000), tsp_ex3(tsp[0]), tsp[1]
    average_deviation_hc += (hc / opt - 1) * 100
    average_deviation_sa += (sa / opt - 1) * 100
    average_deviation_ex += (ex / opt - 1) * 100
l = len(tsp_list)
print("Average deviation from optimum    HC: " + str(average_deviation_hc / l) + "%, SA: " + str(
    average_deviation_sa / l) + "%, EX3: " + str(average_deviation_ex / l) + "%") 