In [0]:
%pip install pyomo --quiet
dbutils.library.restartPython()

In [0]:
import pyomo.environ as pyo
import pandas as pd
import numpy as np
import mlflow
import os

In [0]:
%sh bash install_cbc.sh

The code block below:

Builds the three-tier network exactly with “close” Tier 3 → Tier 2 links (each Tier-3 node picks the nearest Tier-2 node, with a random tie-break when it sits between two). Guarantees no isolation – every Tier-1 node receives ≥ 1 edge from Tier-2, and every Tier-2 node receives ≥ 1 edge from Tier-3 (the assertion would fail otherwise). Visualises the tiers cleanly with distinctive shapes (●, ■, ▲) and a layered layout.

In [0]:
import random, math
import matplotlib.pyplot as plt
from collections import defaultdict

# ---------------- CONFIG ----------------
N1, N2, N3 = 5, 10, 20               # tier sizes
assert N3 == 2 * N2, "Assuming 2× Tier‑2 nodes in Tier‑3 to keep layout neat"

random.seed(777)                         # set an int for reproducibility

tier1 = [f"T1_{i}" for i in range(1, N1 + 1)]
tier2 = [f"T2_{i}" for i in range(1, N2 + 1)]
tier3 = [f"T3_{i}" for i in range(1, N3 + 1)]

edges = []  # (src, tgt)

# -------- Tier‑2 → Tier‑1 (1–3 out‑edges each, cover all Tier‑1) --------
t2_out = {t2: set() for t2 in tier2}

# Step 1: guarantee each Tier‑1 gets at least one edge
shuffled_t2 = tier2.copy()
random.shuffle(shuffled_t2)
for t1_node, t2_node in zip(tier1, shuffled_t2):
    edges.append((t2_node, t1_node))
    t2_out[t2_node].add(t1_node)

# Step 2: top up to 1–3 edges per Tier‑2 node
for t2_node in tier2:
    desired = random.randint(1, 3)
    while len(t2_out[t2_node]) < desired:
        candidate = random.choice(tier1)
        if candidate not in t2_out[t2_node]:
            edges.append((t2_node, candidate))
            t2_out[t2_node].add(candidate)

# -------- Tier‑3 → Tier‑2 (exactly 1 edge, choose ‘close’ Tier‑2) --------
incoming_t2 = {t2: 0 for t2 in tier2}

# Even‑indexed Tier‑3 nodes (indices 0,2,4,…) are directly below Tier‑2 nodes 0‑9
tier3_even = tier3[::2]
for idx, t3_node in enumerate(tier3_even):
    tgt = tier2[idx]
    edges.append((t3_node, tgt))
    incoming_t2[tgt] += 1

# Odd‑indexed Tier‑3 nodes choose between their two nearest Tier‑2 nodes
tier3_odd = tier3[1::2]
for n, t3_node in enumerate(tier3_odd):
    low = n
    high = min(n + 1, N2 - 1)
    tgt = tier2[random.choice([low, high])] if low != high else tier2[low]
    edges.append((t3_node, tgt))
    incoming_t2[tgt] += 1

# -------- VERIFY --------
deg = defaultdict(int)
for u, v in edges:
    deg[u] += 1
    deg[v] += 1

isolated = [node for node in (*tier1, *tier2, *tier3) if deg[node] == 0]
assert not isolated, f"Isolated nodes detected: {isolated}"

In [0]:
# -------- POSITIONS (layered layout) --------
pos = {}
for i, n in enumerate(tier1):
    pos[n] = (i, 2)
for i, n in enumerate(tier2):
    pos[n] = (i, 1)
for i, n in enumerate(tier3):
    pos[n] = (i * 0.5, 0)

# -------- VISUALISATION --------
fig, ax = plt.subplots(figsize=(15, 6))

# Draw nodes: distinct markers, subtle outline for clarity
ax.scatter([pos[n][0] for n in tier1], [pos[n][1] for n in tier1],
           s=550, marker='o', label="Tier 1", edgecolor='k', linewidth=0.5)

ax.scatter([pos[n][0] for n in tier2], [pos[n][1] for n in tier2],
           s=550, marker='s', label="Tier 2", edgecolor='k', linewidth=0.5)

ax.scatter([pos[n][0] for n in tier3], [pos[n][1] for n in tier3],
           s=450, marker='^', label="Tier 3", edgecolor='k', linewidth=0.5)

# Label nodes
for node, (x, y) in pos.items():
    ax.text(x, y, node, ha='center', va='center', fontsize=8)

# Draw directed edges
for src, tgt in edges:
    sx, sy = pos[src]
    tx, ty = pos[tgt]
    ax.annotate("",
                xy=(tx, ty),
                xytext=(sx, sy),
                arrowprops=dict(arrowstyle="-|>", lw=0.8))

ax.set_ylim(-0.7, 2.7)
ax.axis("off")
plt.title("Three‑Tier Directed Network (No Isolation, Proximity‑Aware Links)")
ax.legend(loc="upper left")
plt.tight_layout()
plt.show()

print("Network built successfully — every node has ≥ 1 incident edge.")
print(f"Total edges: {len(edges)} (Tier‑2→1: {sum(len(s) for s in t2_out.values())}, "
      f"Tier‑3→2: {N3})")

In [0]:
# Properties of the product nodes
f = {'T1_1': 0.1, 'T1_2': 0.15, 'T1_3': 0.2, 'T1_4': 0.15, 'T1_5': 0.1}    # profit margin
d = {'T1_1': 200, 'T1_2': 150, 'T1_3': 50, 'T1_4': 150, 'T1_5': 200}       # demand 

# All part types needed to produce final products
part_types = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's']

# Part types produced at each supplier site
supplier_part_type = {
     'T2_1': 'a', 'T2_2': 'a', 'T2_3': 'a', 'T2_4': 'b', 'T2_5': 'b',
     'T2_6': 'b', 'T2_7': 'c', 'T2_8': 'c', 'T2_9': 'd', 'T2_10': 'd',
     'T3_1': 'e', 'T3_2': 'f', 'T3_3': 'g', 'T3_4': 'h', 'T3_5': 'h',
     'T3_6': 'i', 'T3_7': 'j', 'T3_8': 'k', 'T3_9': 'l', 'T3_10': 'l',
     'T3_11': 'm', 'T3_12': 'n', 'T3_13': 'o', 'T3_14': 'p', 'T3_15': 'p',
     'T3_16': 'q', 'T3_17': 'q', 'T3_18': 'r', 'T3_19': 's', 'T3_20': 's',
     }

# Production capacity of each site
c = {
     'T1_1': 150, 'T1_2': 100, 'T1_3': 200, 'T1_4': 150, 'T1_5': 50,
     'T2_1': 150, 'T2_2': 100, 'T2_3': 200, 'T2_4': 150, 'T2_5': 50,
     'T2_6': 150, 'T2_7': 100, 'T2_8': 200, 'T2_9': 150, 'T2_10': 50,
     'T3_1': 150, 'T3_2': 100, 'T3_3': 200, 'T3_4': 150, 'T3_5': 50,
     'T3_6': 150, 'T3_7': 100, 'T3_8': 200, 'T3_9': 150, 'T3_10': 50,
     'T3_11': 150, 'T3_12': 100, 'T3_13': 200, 'T3_14': 150, 'T3_15': 50,
     'T3_16': 150, 'T3_17': 100, 'T3_18': 200, 'T3_19': 150, 'T3_20': 50,
     }

# Stock at each site
s = {
     'T1_1': 20, 'T1_2': 15, 'T1_3': 5, 'T1_4': 15, 'T1_5': 20,
     'T2_1': 20, 'T2_2': 15, 'T2_3': 5, 'T2_4': 15, 'T2_5': 20,
     'T2_6': 20, 'T2_7': 15, 'T2_8': 5, 'T2_9': 15, 'T2_10': 20,
     'T3_1': 20, 'T3_2': 15, 'T3_3': 5, 'T3_4': 15, 'T3_5': 20,
     'T3_6': 20, 'T3_7': 15, 'T3_8': 5, 'T3_9': 15, 'T3_10': 20,
     'T3_11': 20, 'T3_12': 15, 'T3_13': 5, 'T3_14': 15, 'T3_15': 20,
     'T3_16': 20, 'T3_17': 15, 'T3_18': 5, 'T3_19': 15, 'T3_20': 20,
     }

# Time to recover
t = 10

disrupted = []

In [0]:
from collections import defaultdict

N_minus = defaultdict(set)
for i, j in edges:
    if j in tier1 + tier2:
        N_minus[j].add(supplier_part_type[i])
N_minus = {node: sorted(list(parts)) for node, parts in N_minus.items()}

N_plus = defaultdict(set)
for i, j in edges:
    if i in tier2 + tier3:
        N_plus[i].add(j)
N_plus = {node: sorted(list(childs)) for node, childs in N_plus.items()}

# parent nodes of node j of part type k: dict  (j,k) ↦ list_of_i  (𝒫_{jk})
P = defaultdict(list)
for i, j in edges:
    if j in tier1 + tier2:
        P[(j, supplier_part_type[i])].append(i)

In [0]:
# optimisation_model.py
from pyomo.environ import *

In [0]:
# ------------------------------------------------------------------
# 1.  Prepare your data  (⇦ replace the dummy placeholders)
# ------------------------------------------------------------------
data = {
    # elementary sets ------------------------------------------------
    'V'      : tier1,               # product nodes
    'D'      : tier1 + tier2,       # all BUT leaf nodes
    'U'      : tier2 + tier3,       # all BUT product nodes
    'K'      : part_types,          # part types  (k ∈ 𝒩⁻(j))
    'S'      : disrupted,           # disrupted nodes in scenario n
    # nested sets ----------------------------------------------------
    'N_minus' : N_minus,         # parts required to produce node j: dict  j ↦ list_of_k   (𝒩⁻(j))
    'N_plus'  : N_plus,          # child nodes of node i: dict  i ↦ list_of_j   (𝒩⁺(i))
    'P'       : P,               # parent nodes of node j of part type k: dict  (j,k) ↦ list_of_i  (𝒫_{jk})
    # parameters -----------------------------------------------------
    'f'  : f,    # profit margin of 1 unit of j
    's'  : s,    # finished-goods inventory of i
    't'  : t,    # TTR for disruption scenario n   (a scalar)
    'd'  : d,    # demand for j per TTR
    'c'  : c,    # plant capacity per TTR
}

In [0]:
# ------------------------------------------------------------------
# 2.  Build the ConcreteModel
# ------------------------------------------------------------------

m = ConcreteModel()

# ---- 2.1  sets ----------------------------------------------------
m.V   = Set(initialize=data['V'])
m.D   = Set(initialize=data['D'])
m.U   = Set(initialize=data['U'])
m.K   = Set(initialize=data['K'])
m.S   = Set(initialize=data['S'])

m.N_minus = Set(m.D, initialize=lambda mdl, j: data['N_minus'][j])
m.N_plus  = Set(m.U, initialize=lambda mdl, i: data['N_plus'][i])

# handy union of *all* nodes that may carry production volume
m.NODES = pyo.Set(initialize=list(
        set(data['V']) | set(data['U'])
    ))

# 𝒫_{jk} as map (j,k) → list_of_i
m.P = Set(dimen=3, initialize=[
        (i, j, k)
        for (j, k), I in data['P'].items()
        for i in I
    ])

# ---- 2.2  parameters ---------------------------------------------
m.f = Param(m.V, initialize=data['f'], within=NonNegativeReals)          # impact (profit margin)
m.s = Param(m.NODES, initialize=data['s'], within=NonNegativeIntegers)
m.t = Param(initialize=data['t'], within=PositiveReals)
m.d = Param(m.V, initialize=data['d'], within=NonNegativeIntegers)
m.c = Param(m.NODES, initialize=data['c'], within=NonNegativeIntegers)

# ---- 2.3  decision variables -------------------------------------
m.u = Var(m.NODES, domain=NonNegativeIntegers)      # production quantity of node i
m.l = Var(m.V, domain=NonNegativeIntegers)          # lost volume of product j
# y is only needed for (i,j) pairs that actually make sense
m.y_index = Set(within=m.U * m.NODES, initialize=lambda mdl: [
        (i, j) for i in mdl.U for j in mdl.N_plus[i]
    ])
m.y = Var(m.y_index, domain=NonNegativeIntegers)

In [0]:
# ------------------------------------------------------------------
# 3.  objective
# ------------------------------------------------------------------

def obj_rule(mdl):
    return sum(mdl.f[j] * mdl.l[j] for j in mdl.V)
m.OBJ = Objective(rule=obj_rule, sense=minimize)

In [0]:
# ------------------------------------------------------------------
# 4.  constraints
# ------------------------------------------------------------------
# u_j − Σ_{i∈𝒫_{jk}} y_ij  ≤ 0,           ∀j∈𝒟, ∀k∈𝒩⁻(j)
def bom_production_rule(mdl, j, k):
    rhs = sum(mdl.y[i, j] for i in data['P'][(j, k)])
    return mdl.u[j] - rhs <= 0
m.BomProduction = Constraint([(j, k) for j in m.D for k in m.N_minus[j]], rule=bom_production_rule)

# Σ_{j∈𝒩⁺(i)} y_ij − u_i ≤ s_i,            ∀ i∈𝒰
def flow_balance_rule(mdl, i):
    return sum(mdl.y[i, j] for j in mdl.N_plus[i]) - mdl.u[i] <= mdl.s[i]
m.FlowBalance = Constraint(m.U, rule=flow_balance_rule)

# u_j = 0,                                 ∀ j∈𝒮⁽ⁿ⁾
m.Disrupted = Constraint(m.S, rule=lambda m, j: m.u[j] == 0)

# l_j + u_j + s_j ≥ d_j · t⁽ⁿ⁾,            ∀ j∈𝒱
def demand_rule(mdl, j):
    return mdl.l[j] + mdl.u[j] + mdl.s[j] >= mdl.d[j] * mdl.t
m.Demand = Constraint(m.V, rule=demand_rule)

# Σ_{k∈𝒜_α} u_k ≤ c_α · t⁽ⁿ⁾,                ∀ j∈NODES
def capacity_rule(mdl, j):
    return mdl.u[j] <= mdl.c[j] * mdl.t
m.Capacity = Constraint(m.NODES, rule=capacity_rule)

In [0]:
# ------------------------------------------------------------------
# 5.  solve
# ------------------------------------------------------------------
if __name__ == "__main__":
    # choose any LP/MIP solver that Pyomo can see (CBC, Gurobi, CPLEX, HiGHS, …)
    solver = SolverFactory("cbc")      # just an example
    result = solver.solve(m, tee=True)
    m.display()                        # quick sanity-check of results
