<a href="https://colab.research.google.com/github/sazzeb/simulated-annealing/blob/main/Project_5210_part2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from collections import defaultdict, deque
import random, math
random.seed(42)


In [2]:
def build_graph(capacity_dict):
    """Build internal graph dict from a capacity dict mapping (u,v)->cap."""
    g = {'capacity': dict(capacity_dict), 'edges': {}}
    for (u, v), cap in g['capacity'].items():
        if cap < 0:
            raise ValueError(f"Negative capacity on edge {(u,v)}")
        g['edges'].setdefault(u, []).append(v)
    return g


In [3]:

def heuristic_total_inflow_to_sink(flow, sink):
    """(A.a) Objective: total inflow into sink."""
    return sum(f for (u, v), f in flow.items() if v == sink)

In [4]:

def make_residual(graph, flow):
    """
    Build residual adjacency and attributes for edges.
    For base edge (u,v) with capacity C and flow F:
      - if F < C: forward residual u->v with residual = C-F
      - if F > 0: backward residual v->u with residual = F
    """
    res_adj = defaultdict(list)
    res_attr = {}  # (u,v) -> dict(residual, forward, base)
    cap = graph['capacity']
    for (u, v), C in cap.items():
        F = flow.get((u, v), 0)
        if F < C:
            res_adj[u].append(v)
            res_attr[(u, v)] = {'residual': C - F, 'forward': True,  'base': (u, v)}
        if F > 0:
            res_adj[v].append(u)
            res_attr[(v, u)] = {'residual': F,     'forward': False, 'base': (u, v)}
    return res_adj, res_attr


In [5]:
def make_predecessors(res_adj):
    """Compute predecessor lists (for sink→source BFS over residual)."""
    preds = defaultdict(list)
    for u, nbrs in res_adj.items():
        for v in nbrs:
            preds[v].append(u)
    return preds

In [6]:
def bfs_sink_to_source(res_adj, res_attr, sink, source):
    """
    (A.b) Sink→Source BFS over residual by walking predecessors.
    Returns:
      path_sink_to_source: [sink,...,source]
      app_edges:           residual edges in source→sink order
      m:                   bottleneck residual along app_edges
    """
    preds = make_predecessors(res_adj)
    parent = {sink: None}
    q = deque([sink])
    while q:
        u = q.popleft()
        for p in preds.get(u, []):  # predecessors of u
            if p in parent:
                continue
            parent[p] = u
            if p == source:
                # reconstruct sink->...->source
                rev = [source]
                while rev[-1] is not None:
                    nxt = parent[rev[-1]]
                    rev.append(nxt)
                    if nxt is None:
                        break
                path_s2r = list(reversed(rev[:-1]))  # [sink,...,source]
                # application edges should be in source->sink order
                path_r2s = list(reversed(path_s2r))
                app_edges = list(zip(path_r2s[:-1], path_r2s[1:]))
                if not app_edges:
                    return None, None, 0
                m = min(res_attr[(a, b)]['residual'] for (a, b) in app_edges)
                return path_s2r, app_edges, m
            q.append(p)
    return None, None, 0

In [8]:
def successor_SA(graph, flow, source, sink):
    """
    (A.b) Successor via residual augmentation with random increment f∈[1..m].
    Returns (new_flow, path_sink_to_source, f) or (None, None, 0) if no path.
    """
    res_adj, res_attr = make_residual(graph, flow)

    # quick residual reachability check from source to sink
    dq = deque([source]); seen = {source}; reachable_sink = False
    while dq:
        u = dq.popleft()
        if u == sink:
            reachable_sink = True
            break
        for v in res_adj.get(u, []):
            if v not in seen:
                seen.add(v); dq.append(v)
    if not reachable_sink:
        return None, None, 0

    path_s2r, app_edges, m = bfs_sink_to_source(res_adj, res_attr, sink, source)
    if not app_edges:
        return None, None, 0

    f = random.randint(1, m)  # random increment per spec
    new_flow = dict(flow)
    for (u, v) in app_edges:
        attr = res_attr[(u, v)]
        base_u, base_v = attr['base']
        if attr['forward']:
            new_flow[(base_u, base_v)] = new_flow.get((base_u, base_v), 0) + f
        else:
            new_flow[(base_u, base_v)] = new_flow.get((base_u, base_v), 0) - f
        # clamp to [0, capacity]
        C = graph['capacity'][(base_u, base_v)]
        new_flow[(base_u, base_v)] = max(0, min(C, new_flow[(base_u, base_v)]))
    return new_flow, path_s2r, f

In [9]:
def simulated_annealing(graph, source, sink, T_init=100.0, alpha=0.99, max_iter=2000, no_path_break=True):
    """
    (A.c) SA driver with standard acceptance & multiplicative cooling.
    Returns (best_network, best_value).
    """
    flow = defaultdict(int)
    best_flow = dict(flow)
    def h(F): return heuristic_total_inflow_to_sink(F, sink)

    current = h(flow); best = current
    T = float(T_init)

    for _ in range(max_iter):
        new_flow, path_s2r, inc = successor_SA(graph, flow, source, sink)
        if new_flow is None:
            if no_path_break:
                break
            else:
                T *= alpha
                if T <= 1e-12:
                    break
                continue

        nxt = h(new_flow)
        delta = nxt - current
        if delta > 0 or random.random() < math.exp(delta / max(T, 1e-12)):
            flow = new_flow
            current = nxt
            if current > best:
                best = current
                best_flow = dict(flow)

        T *= alpha
        if T <= 1e-12:
            break

    return {'capacity': graph['capacity'], 'flow': dict(best_flow)}, best

In [10]:

def verify_part_A_on_small_example():
    cap = {(0,1):4, (0,2):5, (1,3):3, (2,3):6}
    g = build_graph(cap)
    net, tf = simulated_annealing(g, 0, 3, T_init=100, alpha=0.99, max_iter=1000)
    print(f"[Part A demo] Total inflow to sink (3) = {tf}")
verify_part_A_on_small_example()


[Part A demo] Total inflow to sink (3) = 8


In [11]:
NAME = {'S':0,'U':1,'V':2,'W':3,'Z':4,'E':29}
S,U,V,W,Z,E = NAME['S'], NAME['U'], NAME['V'], NAME['W'], NAME['Z'], NAME['E']

In [13]:
def r1_samples():
    return [
        ("R1-01", {(S,U):10,(S,V):8,(U,W):5,(U,Z):4,(V,W):6,(V,Z):3,(W,E):9,(Z,E):8}),
        ("R1-02", {(S,U):7,(S,V):7,(U,W):6,(V,W):5,(U,Z):4,(V,Z):4,(W,Z):2,(W,E):6,(Z,E):7}),
        ("R1-03", {(S,U):9,(S,V):5,(U,V):3,(V,W):7,(U,Z):4,(W,E):8,(Z,E):6}),
        ("R1-04", {(S,U):12,(S,V):4,(U,W):5,(V,W):4,(U,Z):6,(V,Z):2,(W,Z):3,(W,E):6,(Z,E):10}),
        ("R1-05", {(S,U):6,(S,V):11,(U,V):2,(U,W):3,(V,W):8,(V,Z):5,(W,E):9,(Z,E):4}),
        ("R1-06", {(S,U):10,(U,W):10,(S,V):10,(V,Z):10,(W,Z):5,(W,E):4,(Z,E):12}),
        ("R1-07", {(S,U):5,(S,V):9,(U,W):5,(V,W):5,(U,Z):1,(V,Z):2,(W,E):7,(Z,E):3}),
        ("R1-08", {(S,U):8,(S,V):8,(U,V):4,(V,U):3,(U,W):6,(V,Z):6,(W,Z):2,(W,E):6,(Z,E):6}),
        ("R1-09", {(S,U):15,(S,V):2,(U,W):5,(U,Z):10,(V,W):2,(W,E):7,(Z,E):10}),
        ("R1-10", {(S,U):4,(S,V):4,(U,W):4,(V,W):4,(U,Z):4,(V,Z):4,(W,E):4,(Z,E):4}),
        ("R1-11", {(S,U):9,(S,V):9,(U,W):3,(V,W):7,(U,Z):7,(V,Z):3,(W,Z):2,(W,E):8,(Z,E):8}),
        ("R1-12", {(S,U):6,(S,V):6,(U,Z):6,(V,W):6,(W,Z):3,(W,E):7,(Z,E):5}),
    ]

In [14]:
def run_R1(T_init=100, alpha=0.995, max_iter=2000):
    results = []
    for label, cap in r1_samples():
        g = build_graph(cap)
        net, tf = simulated_annealing(g, S, E, T_init=T_init, alpha=alpha, max_iter=max_iter)
        print(f"{label}: SA total inflow to E = {tf}")
        results.append((label, tf, net))
    return results

_ = run_R1()

R1-01: SA total inflow to E = 16
R1-02: SA total inflow to E = 13
R1-03: SA total inflow to E = 11
R1-04: SA total inflow to E = 15
R1-05: SA total inflow to E = 13
R1-06: SA total inflow to E = 16
R1-07: SA total inflow to E = 10
R1-08: SA total inflow to E = 12
R1-09: SA total inflow to E = 17
R1-10: SA total inflow to E = 8
R1-11: SA total inflow to E = 16
R1-12: SA total inflow to E = 11


In [15]:
def random_digraph_capacities(n=30, p=0.1, cap_lo=1, cap_hi=10, seed=None):
    if seed is not None:
        random.seed(seed)
    cap = {}
    for u in range(n):
        for v in range(n):
            if u == v:
                continue
            if random.random() < p:
                cap[(u, v)] = random.randint(cap_lo, cap_hi)
    return cap


In [16]:

def has_path(capacity, source, sink):
    adj = defaultdict(list)
    for (u, v), C in capacity.items():
        if C > 0:
            adj[u].append(v)
    dq = deque([source]); seen = {source}
    while dq:
        u = dq.popleft()
        if u == sink:
            return True
        for v in adj[u]:
            if v not in seen:
                seen.add(v); dq.append(v)
    return False

In [17]:
def edmonds_karp_max_flow_value(capacity, source, sink):
    """
    Classic EK on our capacity dict without external deps.
    """
    res = defaultdict(int)
    adj = defaultdict(list)
    for (u, v), C in capacity.items():
        res[(u, v)] += C
        adj[u].append(v)
        adj[v].append(u)  # ensure residual back-edges accessible

    def bfs_residual_path():
        parent = {source: None}
        dq = deque([source])
        while dq:
            u = dq.popleft()
            for v in adj[u]:
                if v not in parent and res[(u, v)] > 0:
                    parent[v] = u
                    if v == sink:
                        # reconstruct & bottleneck
                        path = [sink]
                        while parent[path[-1]] is not None:
                            path.append(parent[path[-1]])
                        path.reverse()
                        bott = min(res[(path[i], path[i+1])] for i in range(len(path)-1))
                        return path, bott
                    dq.append(v)
        return None, 0

    maxflow = 0
    while True:
        path, bott = bfs_residual_path()
        if not path:
            break
        for i in range(len(path)-1):
            u, v = path[i], path[i+1]
            res[(u, v)] -= bott
            res[(v, u)] += bott
        maxflow += bott
    return maxflow


In [18]:
def run_R2(n=30, p=0.1, seed=7):
    source, sink = 0, 29
    cap = random_digraph_capacities(n=n, p=p, seed=seed)
    g = build_graph(cap)

    # SA
    if has_path(cap, source, sink):
        _, tf_sa = simulated_annealing(g, source, sink, T_init=200, alpha=0.995, max_iter=3000)
    else:
        tf_sa = 0

    # EK
    tf_ek = edmonds_karp_max_flow_value(cap, source, sink)

    print(f"R2 — SA tf={tf_sa}, Edmonds–Karp tf_net={tf_ek}")
    return tf_sa, tf_ek

_ = run_R2(n=30, p=0.10, seed=7)

R2 — SA tf=3, Edmonds–Karp tf_net=3


In [19]:
def run_R3(num_graphs=30, n=30, p=0.10, seed=42):
    source, sink = 0, 29
    random.seed(seed)

    def sa_avg(T_init):
        vals = []
        for i in range(num_graphs):
            cap = random_digraph_capacities(n=n, p=p, seed=seed + i)
            g = build_graph(cap)
            if has_path(cap, source, sink):
                _, tf = simulated_annealing(g, source, sink, T_init=T_init, alpha=0.995, max_iter=2500)
            else:
                tf = 0
            vals.append(tf)
        return sum(vals)/len(vals) if vals else 0.0

    avg100  = sa_avg(100)
    avg1000 = sa_avg(1000)
    print(f"R3 — tf_net_avg(100)  = {avg100:.3f}")
    print(f"R3 — tf_net_avg(1000) = {avg1000:.3f}")
    if abs(avg1000 - avg100) > 0.05*max(1e-6, avg100):
        print("Interpretation: Higher T explores more → greater chance to escape local minima.")
    else:
        print("Interpretation: Similar averages — landscape/cooling likely adequate at T=100.")
    return avg100, avg1000

_ = run_R3(num_graphs=30, n=30, p=0.10, seed=42)


R3 — tf_net_avg(100)  = 9.733
R3 — tf_net_avg(1000) = 9.733
Interpretation: Similar averages — landscape/cooling likely adequate at T=100.
