In [1]:
# Cell 1
import os, json, time, math, itertools
from collections import defaultdict
import numpy as np
import pandas as pd
import torch
import multiprocessing as mp

import gurobipy as gp

BASE_DIR = r"D:\MINJI\NETWORK RELIABILITY\QGIS\7.Korea_Full\json"
edge_fp   = os.path.join(BASE_DIR, "edges.json")
route_fp  = os.path.join(BASE_DIR, "routes_nodes.json")
demand_fp = os.path.join(BASE_DIR, "demand.json")
dept_fp   = os.path.join(BASE_DIR, "dep_time.json")

# 재현성
GLOBAL_SEED = 42
torch.manual_seed(GLOBAL_SEED)
np.random.seed(GLOBAL_SEED)


In [2]:
# Cell 2
# 0-b. 고정 파라미터
T        = 12
max_wait = 0
CAPACITY = 3
w1, w2, w3 = 1000, 50, 10
BIG_M = 10**6
SINK  = "SINK"

# 0-c. JSON 로드
with open(edge_fp, encoding="utf-8") as f:
    edges_raw = json.load(f)
edges = {eid: (src, dst, int(tau)) for eid, (src, dst, tau) in edges_raw.items()}

with open(route_fp, encoding="utf-8") as f:
    routes_nodes = json.load(f)

with open(dept_fp, encoding="utf-8") as f:
    dep_time = {tr: int(t) for tr, t in json.load(f).items()}

with open(demand_fp, encoding="utf-8") as f:
    dem_raw = json.load(f)
demand = {tr: [(o, d, float(q)) for o, d, q in lst] for tr, lst in dem_raw.items()}

trains = list(routes_nodes)
nodes  = {n for _, (s, d, _) in edges.items() for n in (s, d)}
nodes.add(SINK)

# e1/e1r ~ e95/e95r 쌍 구성 (실제 edges에 존재하는 것만)
edge_pairs = []
for i in range(1, 96):
    e  = f"e{i}"
    er = f"e{i}r"
    if e in edges and er in edges:
        edge_pairs.append((e, er))
NUM_PAIRS = len(edge_pairs)
print(f"[Main] Pairs detected: {NUM_PAIRS}")


[Main] Pairs detected: 95


In [3]:
# Cell 3
DEVICE_SAMPLING = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[Main] Sampling device: {DEVICE_SAMPLING}")

def sample_failed_sets(batch_size: int,
                       p_low=0.05, p_high=0.20,
                       prob_mode='fixed',
                       device: str = DEVICE_SAMPLING):
    pairs = NUM_PAIRS
    if prob_mode == 'fixed':
        p_pairs = torch.empty(pairs, device=device).uniform_(p_low, p_high)
        U = torch.rand(batch_size, pairs, device=device)
        fail_mask = (U < p_pairs)
    elif prob_mode == 'resample':
        p_pairs = torch.empty(batch_size, pairs, device=device).uniform_(p_low, p_high)
        U = torch.rand(batch_size, pairs, device=device)
        fail_mask = (U < p_pairs)
    else:
        raise ValueError("prob_mode must be 'fixed' or 'resample'.")

    b_idx, j_idx = fail_mask.nonzero(as_tuple=True)
    b_idx = b_idx.cpu().numpy()
    j_idx = j_idx.cpu().numpy()

    failed_sets = [set() for _ in range(batch_size)]
    for b, j in zip(b_idx, j_idx):
        e, er = edge_pairs[j]
        failed_sets[b].add(e); failed_sets[b].add(er)
    return failed_sets


[Main] Sampling device: cpu


In [4]:
# Cell 4
def build_arc_and_caches(failed_edges: set):
    arc_list = []  # (from,to,eid,tau,t0,t1)
    for eid, (src, dst, tau) in edges.items():
        if eid in failed_edges:
            continue
        for t in range(T + 1 - tau):
            arc_list.append((f"{src}^{t}", f"{dst}^{t+tau}", eid, tau, t, t+tau))

    for n in nodes - {SINK}:
        for w in range(1, max_wait+1):
            for t in range(T + 1 - w):
                arc_list.append((f"{n}^{t}", f"{n}^{t+w}", f"w_{n}_{w}", w, t, t+w))
    for n in nodes - {SINK}:
        for t in range(T+1):
            arc_list.append((f"{n}^{t}", f"{SINK}^{t}", f"dummy_{n}", 0, t, t))

    arc_idx = {info: i for i, info in enumerate(arc_list)}

    out_arcs = defaultdict(list)
    in_arcs  = defaultdict(list)
    node_in_arcs  = defaultdict(list)
    node_out_arcs = defaultdict(list)

    for k, (fr, to, eid, _, t0, t1) in enumerate(arc_list):
        n_fr, tt_fr = fr.split("^"); tt_fr = int(tt_fr)
        n_to, tt_to = to.split("^"); tt_to = int(tt_to)
        out_arcs[(n_fr, tt_fr)].append(k)
        in_arcs [(n_to, tt_to)].append(k)
        if not eid.startswith(("w_", "dummy")):
            node_out_arcs[n_fr].append(k)
            node_in_arcs [n_to].append(k)

    # 예정 도착시각(경로 기준, 고장 엣지는 우회/캔슬로 처리될 수 있으므로 그대로 구성)
    sched = {}
    for tr, path in routes_nodes.items():
        t, arr = 0, {path[0]: dep_time[tr]}
        for u, v in zip(path[:-1], path[1:]):
            eid = next(e for e, (s, d, _) in edges.items() if s == u and d == v)
            t += edges[eid][2]; arr[v] = dep_time[tr] + t
        sched[tr] = arr

    q_r = {tr: sum(q for *_, q in demand[tr]) for tr in trains}

    cap_map = defaultdict(list)
    for k, (fr, to, eid, _, t0, t1) in enumerate(arc_list):
        if eid.startswith(("w_", "dummy")): 
            continue
        for tt in range(t0, t1):
            cap_map[(eid, tt)].append(k)

    return {
        "arc_list": arc_list,
        "arc_idx": arc_idx,
        "out_arcs": out_arcs,
        "in_arcs": in_arcs,
        "node_in_arcs": node_in_arcs,
        "node_out_arcs": node_out_arcs,
        "sched": sched,
        "q_r": q_r,
        "cap_map": cap_map,
    }

In [5]:
# Cell 5
def build_base_model(components, env):
    arc_list      = components["arc_list"]
    arc_idx       = components["arc_idx"]
    out_arcs      = components["out_arcs"]
    in_arcs       = components["in_arcs"]
    node_in_arcs  = components["node_in_arcs"]
    node_out_arcs = components["node_out_arcs"]
    sched         = components["sched"]
    q_r           = components["q_r"]

    m = gp.Model(env=env)
    m.Params.OutputFlag = 0
    m.Params.Threads = 1

    nA, nT = len(arc_list), len(trains)
    x = m.addVars(nA, nT, vtype=gp.GRB.BINARY, name="x")
    h = m.addVars(trains, vtype=gp.GRB.BINARY, name="h")

    y = {(tr, n): m.addVar(vtype=gp.GRB.BINARY) for tr in trains for n in routes_nodes[tr][1:]}
    s = {(tr, n): m.addVar(vtype=gp.GRB.BINARY) for tr in trains for n in nodes - {SINK}}
    z = {(tr,o,d): m.addVar(vtype=gp.GRB.BINARY) for tr in trains for (o,d,_) in demand[tr]}
    delta = {(tr,o,d): m.addVar(lb=0) for tr in trains for (o,d,_) in demand[tr]}
    t_arr = {(tr, n): m.addVar(lb=0, ub=T, vtype=gp.GRB.INTEGER) for tr in trains for n in nodes - {SINK}}

    # (2) 출발-flow
    for tr_i, tr in enumerate(trains):
        r_o, t_dep = routes_nodes[tr][0], dep_time[tr]
        idx_out = out_arcs[(r_o, t_dep)]
        m.addConstr(x.sum(idx_out, tr_i) == h[tr])

    # (5) 노드-시간 보존
    for tr_i, tr in enumerate(trains):
        r_o, t_dep = routes_nodes[tr][0], dep_time[tr]
        for n in nodes - {SINK}:
            for t in range(T+1):
                inflow  = x.sum(in_arcs[(n, t)],  tr_i)
                outflow = x.sum(out_arcs[(n, t)], tr_i)
                if (n == r_o) and (t == t_dep):
                    m.addConstr(outflow - inflow == h[tr])
                else:
                    m.addConstr(inflow == outflow)

    # terminal & dummy
    for tr_i, tr in enumerate(trains):
        cand = routes_nodes[tr][1:]
        m.addConstr(gp.quicksum(y[tr, n] for n in cand) == h[tr])
        for n in cand:
            idx_dum = [arc_idx[a] for a in arc_list if a[2] == f"dummy_{n}"]
            m.addConstr(x.sum(idx_dum, tr_i) == y[tr, n])

    # s (visit)
    for (tr, n), var in s.items():
        tr_i = trains.index(tr)
        idx = node_out_arcs[n] + node_in_arcs[n]
        flow = x.sum(idx, tr_i)
        m.addConstr(flow >= var)
        m.addConstr(flow <= BIG_M * var)

    # z-logic & delta
    for tr in trains:
        for (o, d, q) in demand[tr]:
            m.addConstr(z[tr, o, d] <= s[tr, o])
            m.addConstr(z[tr, o, d] <= s[tr, d])
            m.addConstr(z[tr, o, d] >= s[tr, o] + s[tr, d] - 1)
            m.addConstr(z[tr, o, d] <= h[tr])

    for tr_i, tr in enumerate(trains):
        for k, (fr, to, eid, *_ ) in enumerate(arc_list):
            if eid.startswith(("w_", "dummy")):
                continue
            n_to, tt = to.split("^"); tt = int(tt)
            m.addConstr(t_arr[tr, n_to] >= tt - BIG_M * (1 - x[k, tr_i]))
            m.addConstr(t_arr[tr, n_to] <= tt + BIG_M * (1 - x[k, tr_i]))
        for (o, d, q) in demand[tr]:
            sched_t = sched[tr].get(d, T)
            m.addConstr(delta[tr, o, d] >= t_arr[tr, d] - sched_t - BIG_M * (1 - z[tr, o, d]))
            m.addConstr(delta[tr, o, d] <= BIG_M * z[tr, o, d])

    obj  = gp.quicksum(w1 * q_r[tr] * (1 - h[tr]) for tr in trains)
    obj += gp.quicksum(w2 * q * (1 - z[tr, o, d]) for tr in trains for (o, d, q) in demand[tr])
    obj += gp.quicksum(w3 * q * delta[tr, o, d] * z[tr, o, d] for tr in trains for (o, d, q) in demand[tr])
    m.setObjective(obj)
    return m, x, h, delta, z


In [6]:
# Cell 6
def _make_env():
    env = gp.Env(empty=True)
    env.setParam('OutputFlag', 0)
    env.setParam('Threads', 1)
    # 각 워커별 구로비 로그파일(필요시 확인)
    try:
        env.setParam('LogFile', f'gurobi_{os.getpid()}.log')
    except gp.GurobiError:
        pass
    env.start()
    return env

def _log_worker(msg: str, sample_id: int = None):
    # 자식 프로세스에서 파일 로그 남기기 (부모 콘솔로는 잘 안 보임)
    pid = os.getpid()
    prefix = f"[PID {pid}]"
    if sample_id is not None:
        prefix += f"[S{sample_id}]"
    line = f"{prefix} {msg}\n"
    with open(f"worker_{pid}.log", "a", encoding="utf-8") as f:
        f.write(line)

def solve_one_sample(sample_id: int, failed_edges: set, time_limit=30):
    try:
        _log_worker("start", sample_id)
        env = _make_env()
        comp = build_arc_and_caches(failed_edges)
        m, x, h, delta, z = build_base_model(comp, env)

        arc_list = comp["arc_list"]
        cap_map  = comp["cap_map"]

        MAX_ITER, EPS = 40, 1e-6

        t0 = time.time()
        sol_prev = None
        for it in range(MAX_ITER):
            spent = time.time() - t0
            remain = max(0.05, time_limit - spent)
            m.Params.TimeLimit = remain

            if sol_prev:
                for (idx, ti), v in sol_prev.items():
                    if v: x[idx, ti].Start = 1

            _log_worker(f"optimize iter={it}, remain={remain:.2f}s", sample_id)
            m.optimize()

            status = int(m.Status)
            _log_worker(f"status={status}", sample_id)

            # 종료/이어가기 판단
            x_val = {(idx, ti): int(round(x[idx, ti].X))
                     for idx in range(len(arc_list)) for ti in range(len(trains))}
            x_mat = np.zeros((len(arc_list), len(trains)), dtype=np.uint8)
            for (idx, ti), v in x_val.items():
                if v: x_mat[idx, ti] = 1
            viol = {key for key, idx_list in cap_map.items()
                    if x_mat[idx_list].sum() > CAPACITY + EPS}

            if (time.time() - t0) >= time_limit or status in (gp.GRB.OPTIMAL, gp.GRB.TIME_LIMIT):
                if not viol or (time.time() - t0) >= time_limit:
                    _log_worker(f"stop iter={it}, viol={len(viol)}", sample_id)
                    break
                # 시간이 남아있으면 위반 추가
                for (eid, tt) in viol:
                    expr = gp.quicksum(x[idx, ti] for idx in cap_map[(eid, tt)]
                                       for ti in range(len(trains)))
                    m.addConstr(expr <= CAPACITY, name=f"cap_{eid}_{tt}_{it}")
                sol_prev = x_val
                continue
            else:
                # 기타 상태(INF_OR_UNBD 등) → 위반 없으면 종료, 있으면 한번 더 시도
                if not viol:
                    _log_worker(f"stop-other-status iter={it}, no viol", sample_id)
                    break
                for (eid, tt) in viol:
                    expr = gp.quicksum(x[idx, ti] for idx in cap_map[(eid, tt)]
                                       for ti in range(len(trains)))
                    m.addConstr(expr <= CAPACITY, name=f"cap_{eid}_{tt}_{it}")
                sol_prev = x_val

        cancelled = sum(1 for tr in trains if h[tr].X < 0.5)
        delayed   = sum(1 for tr in trains if (h[tr].X >= 0.5) and
                        any(delta[tr, o, d].X > 1e-6 for (o, d, _) in demand[tr]))
        truncated = sum(1 for tr in trains if (h[tr].X >= 0.5) and
                        (not any(delta[tr, o, d].X > 1e-6 for (o, d, _) in demand[tr])) and
                        any(z[tr, o, d].X < 0.5 for (o, d, _) in demand[tr]))

        res = {
            "sample_id": sample_id,
            "status": int(m.Status),
            "obj": (float(m.ObjVal) if m.Status == gp.GRB.OPTIMAL else None),
            "cancelled": cancelled, "delayed": delayed, "truncated": truncated,
            "failed_edges_count": len(failed_edges),
            "walltime_sec": round(time.time() - t0, 3),
        }
        _log_worker(f"done wall={res['walltime_sec']}s status={res['status']}", sample_id)
        m.dispose(); env.dispose()
        return res

    except Exception as e:
        _log_worker(f"exception: {repr(e)}", sample_id)
        return {"sample_id": sample_id, "status": -1, "obj": None, "error": repr(e)}


In [None]:
# Cell 7 — Pool 버전 (spawn + maxtasksperchild=1)
import multiprocessing as mp
import time
import pandas as pd

def _init_worker(seed=None):
    # 필요시 시드/환경 설정
    np.random.seed(GLOBAL_SEED)
    torch.manual_seed(GLOBAL_SEED)

# ★ 래퍼: 튜플(arg)을 언패킹해서 solve_one_sample에 전달
def _solve_wrapper(arg):
    return solve_one_sample(*arg)

def run_parallel_sampling_pool(total_samples=10,
                               concurrent=2,
                               prob_mode='fixed',
                               time_limit=20,
                               p_low=0.05, p_high=0.20):
    """
    spawn 컨텍스트의 Pool을 사용해 안정적으로 병렬 실행.
    각 워커는 한 건 처리 후 재시작되어 리소스 누수/라이선스 충돌을 줄인다.
    """
    try:
        mp.set_start_method("spawn")
    except RuntimeError:
        pass

    ctx = mp.get_context("spawn")
    failed_sets = sample_failed_sets(total_samples, p_low, p_high, prob_mode)

    # args는 (sample_id, failed_set, time_limit) 튜플 리스트
    args = [(sid, failed_sets[sid], time_limit) for sid in range(total_samples)]

    start = time.time()
    results = []
    print(f"[Main] start: total={total_samples}, concurrent={concurrent}, time_limit={time_limit}s")

    with ctx.Pool(processes=concurrent,
                  initializer=_init_worker,
                  maxtasksperchild=1) as pool:

        # ★ 여기! imap_unordered에 래퍼를 넣어서 언패킹 문제 해결
        for i, res in enumerate(pool.imap_unordered(_solve_wrapper, args, chunksize=1), 1):
            results.append(res)
            # 진행 표시 (10% 단위)
            if i % max(1, total_samples // 10) == 0 or i == total_samples:
                print(f"[Main] progress: {i}/{total_samples} done")

    elapsed = time.time() - start
    print(f"[Main] Done {len(results)} samples in {elapsed:.1f}s (concurrent={concurrent}).")

    df = pd.DataFrame(results).sort_values("sample_id").reset_index(drop=True)
    if "error" in df.columns and df["error"].notna().any():
        print("[Main] Some workers returned errors. First few:")
        print(df[df["error"].notna()].head())
    return df

# ⚠️ Windows/Jupyter에서는 반드시 메인 가드 사용!
if __name__ == "__main__":
    mp.freeze_support()  # Windows 안전장치
    # 작은 케이스부터 점진적으로
    df_res = run_parallel_sampling_pool(total_samples=2, concurrent=2, prob_mode='fixed', time_limit=20)
    display(df_res.head())


[Main] start: total=2, concurrent=2, time_limit=20s


In [None]:
# Cell 8
# 10,000 샘플을 100개 동시 실행 (예시)
# df_all = run_parallel_sampling(total_samples=10000, concurrent=100, prob_mode='fixed', time_limit=20)
# df_all.to_csv("mcs_results_10k.csv", index=False)
# display(df_all.describe())