# 单品 AAR 鲁棒库存优化（14天决策周期）

本笔记实现基于 Ben-Tal 等（2004）方法思想的“仿射可调鲁棒”库存优化（AAR）。
- 以14天作为一个决策周期（之前分析显示14天聚合误差更小）。
- 订购时点由二进制变量 `v_k` 决定；订单量采用仿射策略：`u_k = u_{k0} + \sum_{t<k} a_{k,t} (d_t - \hat d_t)`。
- 采用“箱型（Box）不确定集”：`d_t = \hat d_t \pm \Delta_t`，并通过场景极点（±）近似最坏情形。
- 额外提供名义/高低/随机场景模拟来评估策略表现。

若本地找不到“未来需求预测”CSV，将自动构造一个可复现示例数据（可在参数区替换为真实预测）。

In [None]:
# 安装/导入依赖（若缺少将尝试安装）
import sys, subprocess, json, os

def ensure(pkg):
    try:
        __import__(pkg)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

for p in ["pandas", "numpy", "pulp", "matplotlib", "seaborn"]:
    ensure(p)

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pulp as pl

pd.set_option('display.max_columns', 120)
np.random.seed(42)

print("依赖加载完成")

In [None]:
# 参数区：成本、不确定集、周期等
PRODUCT = "电解镍"  # 单品名（仅用于输出标注）
T_days = 14          # 决策周期长度（天）
K_periods = 8        # 优化期内的14天块数（例如 8*14=112天）

# 成本参数（可按需调整/接入真实值）
h = 1.0     # 单位期末库存持有成本（每14天块）
p = 10.0    # 单位缺货惩罚成本（backorder/未满足罚金）
c = 0.0     # 单位订购变动成本（每单位）
f = 100.0   # 固定订购成本（每次下单）
M = 1e6     # Big-M（足够大）

# 不确定集幅度（按14天聚合后的名义需求的比例设定区间）
# 例如 delta_ratio=0.2 表示 d_k ∈ [d_hat_k*(1-0.2), d_hat_k*(1+0.2)]
delta_ratio = 0.2

# 初始库存
I1 = 0.0


In [None]:
# 读取/构造14天聚合需求预测

# 自动探测可能的预测输出文件名
candidate_files = [
    "all_future_predictions.csv",
]

# 也尝试从笔记本里之前输出常见命名
# 用户仓库中之前的notebook输出路径位于本机其他目录，这里仅尝试本目录

forecast_df = None
for name in candidate_files:
    if os.path.exists(name):
        try:
            df = pd.read_csv(name)
            if 'date' in df.columns and 'prediction' in df.columns:
                forecast_df = df.copy()
                break
        except Exception:
            pass

if forecast_df is None:
    # 回退：构造一个示例每日名义预测（均值 + 周期扰动）
    days = T_days * K_periods
    date_index = pd.date_range(start="2025-01-01", periods=days, freq="D")
    base = 1000
    season = 200*np.sin(np.arange(days)/7*2*np.pi)
    noise = np.random.normal(0, 50, size=days)
    pred = np.maximum(base + season + noise, 100)
    forecast_df = pd.DataFrame({
        'date': date_index,
        'prediction': pred
    })

# 14天聚合
forecast_df['date'] = pd.to_datetime(forecast_df['date'])
forecast_df = forecast_df.sort_values('date')
forecast_df['period'] = ((forecast_df['date'] - forecast_df['date'].min()).dt.days // T_days) + 1
agg = forecast_df.groupby('period', as_index=False)['prediction'].sum()

# 只取前 K_periods 个块
agg = agg.head(K_periods)

# 名义需求与不确定界
d_hat = agg['prediction'].to_numpy()
delta = delta_ratio * d_hat

print(f"检测到 {len(forecast_df)} 天预测，聚合为 {len(agg)} 个14天周期。")
agg.head()

In [None]:
# 构建仿射可调鲁棒库存模型（简化版：极点场景法近似）
# 决策：
#  - v_k ∈ {0,1}: 第k块是否下单
#  - u_k0: 第k块名义下单量的常数项
#  - a_{k,t}: 仿射系数（t<k），表示对第t块需求偏差的线性响应
#  - y_k^+ / y_k^-: 期末库存与缺货（backorder），用分解变量逼近 |I_k^-|
# 库存动态：I_k = I_{k-1} + u_k - d_k
# 这里将 d_k = d_hat_k + z_k，z_k ∈ [-delta_k, +delta_k]，并以极点场景 z_k = ±delta_k 形成有限场景集合

K = len(d_hat)

# 生成极点场景：每个周期取 z_k ∈ { -delta_k, +delta_k }
# 为控制规模，采用“笛卡尔抽样的子集”：
#  - 全正、全负
#  - 单点翻转（每次只对一个k取相反号）
scenarios = []
scenarios.append(+delta.copy())            # 全正
scenarios.append(-delta.copy())            # 全负
for flip in range(K):                      # 单点翻转
    z = +delta.copy()
    z[flip] = -delta[flip]
    scenarios.append(z)
for flip in range(K):
    z = -delta.copy()
    z[flip] = +delta[flip]
    scenarios.append(z)

print(f"鲁棒极点场景数: {len(scenarios)}")

# 构建 MILP
m = pl.LpProblem("AAR_Robust_Inventory_14d", pl.LpMinimize)

# 变量
v = pl.LpVariable.dicts('v', list(range(1, K+1)), lowBound=0, upBound=1, cat=pl.LpBinary)
u0 = pl.LpVariable.dicts('u0', list(range(1, K+1)), lowBound=0)  # 常数项非负
# a_{k,t} 仅定义 t<k
A = {}
for k in range(1, K+1):
    for t in range(1, k):
        A[(k,t)] = pl.LpVariable(f"a_{k}_{t}", lowBound=None)  # 系数可正可负

# 对每个场景、每期的库存与缺货分解变量
I = {}
Ypos = {}
Yneg = {}
for s_idx, z in enumerate(scenarios):
    for k in range(1, K+1):
        I[(s_idx,k)] = pl.LpVariable(f"I_s{s_idx}_k{k}", lowBound=None)
        Ypos[(s_idx,k)] = pl.LpVariable(f"Ypos_s{s_idx}_k{k}", lowBound=0)
        Yneg[(s_idx,k)] = pl.LpVariable(f"Yneg_s{s_idx}_k{k}", lowBound=0)

# 为了避免仿射策略在极端下单，限定 u_k 的上界与Big-M和v_k关联
# 这里引入辅助变量 U_k(s) 表示该场景下的实际下单量，用线性表达式定义
U = {}
for s_idx, z in enumerate(scenarios):
    for k in range(1, K+1):
        U[(s_idx,k)] = pl.LpVariable(f"U_s{s_idx}_k{k}", lowBound=0)

# 目标：在所有场景上最坏情形成本最小化（minimize W），并对每个场景施加成本不超过 W
W = pl.LpVariable("W", lowBound=0)

# 成本分解：每个场景成本 = sum_k ( f*v_k + c*U_k + h*Ypos_k + p*Yneg_k )
# 注意：f*v_k 对所有场景相同，计入一次即可，但用“每场景成本≤W”时重复无害（上界更强）
for s_idx, z in enumerate(scenarios):
    scen_cost = []
    for k in range(1, K+1):
        scen_cost.append(f * v[k])
        scen_cost.append(c * U[(s_idx,k)])
        scen_cost.append(h * Ypos[(s_idx,k)])
        scen_cost.append(p * Yneg[(s_idx,k)])
    m += pl.lpSum(scen_cost) <= W, f"worst_cost_bound_s{s_idx}"

# 库存动态与 |I_k| 分解
for s_idx, z in enumerate(scenarios):
    # I_0 = I1（常数）
    m += I[(s_idx,1)] == I1 + U[(s_idx,1)] - (d_hat[0] + z[0]), f"inv_balance_s{s_idx}_k1"
    for k in range(2, K+1):
        # u_k = u0_k + sum_{t<k} a_{k,t} * z_t
        expr_u = u0[k] + pl.lpSum(A[(k,t)] * z[t-1] for t in range(1, k))
        m += U[(s_idx,k)] == expr_u, f"affine_u_s{s_idx}_k{k}"
        m += I[(s_idx,k)] == I[(s_idx,k-1)] + U[(s_idx,k)] - (d_hat[k-1] + z[k-1]), f"inv_balance_s{s_idx}_k{k}"
    # |I_k| 分解成 Ypos, Yneg，并且 I_k = Ypos - Yneg
    for k in range(1, K+1):
        m += I[(s_idx,k)] == Ypos[(s_idx,k)] - Yneg[(s_idx,k)], f"abs_split_s{s_idx}_k{k}"

# 订购触发与Big-M：U_k <= M * v_k；同时定义首期 u0_1 直接等于 U_s,k=1 的名义项（无历史z）
for s_idx, z in enumerate(scenarios):
    for k in range(1, K+1):
        m += U[(s_idx,k)] <= M * v[k], f"order_onoff_s{s_idx}_k{k}"

# 约束：k=1 的仿射下单量仅为常数项
for s_idx, z in enumerate(scenarios):
    m += U[(s_idx,1)] == u0[1], f"u_first_s{s_idx}"

# 目标
m += W

print("模型已构建。变量数：", len(m.variables()))

In [None]:
# 求解模型
solver = pl.PULP_CBC_CMD(msg=False, timeLimit=60)
res = m.solve(solver)
print("Status:", pl.LpStatus[res])
print("最坏情形成本 W:", pl.value(W))

v_sol = np.array([pl.value(v[k]) for k in range(1, K+1)])
u0_sol = np.array([pl.value(u0[k]) for k in range(1, K+1)])

print("v(是否下单):", v_sol.astype(int))
print("u0(名义常数项):", np.round(u0_sol, 2))


In [None]:
# 结果整理与可视化（名义场景）
# 名义场景取 z=0，则 u_k = u0_k；需求取 d_hat
U_nom = u0_sol.copy()
I_nom = np.zeros(K)
Ypos_nom = np.zeros(K)
Yneg_nom = np.zeros(K)

I_nom[0] = I1 + U_nom[0] - d_hat[0]
for k in range(1, K):
    I_nom[k] = I_nom[k-1] + U_nom[k] - d_hat[k]

Ypos_nom = np.maximum(I_nom, 0)
Yneg_nom = np.maximum(-I_nom, 0)

cost_nom = np.sum(f * v_sol + c * U_nom + h * Ypos_nom + p * Yneg_nom)
print(f"名义场景总成本: {cost_nom:.2f}")

fig, ax = plt.subplots(2, 1, figsize=(12, 6), sharex=True)
ax[0].bar(np.arange(1,K+1), d_hat, label='14天聚合需求(名义)')
ax[0].bar(np.arange(1,K+1), U_nom, alpha=0.7, label='下单量(名义仿射)')
ax[0].legend()
ax[0].set_ylabel('数量')

ax[1].plot(np.arange(1,K+1), I_nom, marker='o', label='期末库存(正为库存,负为缺口)')
ax[1].axhline(0, color='k', linewidth=1)
ax[1].legend()
ax[1].set_xlabel('第k个14天周期')
ax[1].set_ylabel('库存')
plt.tight_layout()
plt.show()

In [None]:
# 简单场景模拟：全正/全负

def simulate(z):
    # 构造仿射下单量：u_k = u0_k + sum_{t<k} a_{k,t} z_t
    # 由于我们在求解阶段没有导出 A 系数到名义可视化，这里提取 A 解
    A_sol = np.zeros((K, K))
    for k in range(1, K+1):
        for t in range(1, k):
            A_sol[k-1, t-1] = pl.value(A[(k,t)]) if (k,t) in A else 0.0
    u = u0_sol.copy()
    for k in range(2, K+1):
        u[k-1] = u0_sol[k-1] + np.dot(A_sol[k-1, :k-1], z[:k-1])
    u = np.maximum(u, 0.0)

    I = np.zeros(K)
    I[0] = I1 + u[0] - (d_hat[0] + z[0])
    for k in range(1, K):
        I[k] = I[k-1] + u[k] - (d_hat[k] + z[k])
    y_pos = np.maximum(I, 0)
    y_neg = np.maximum(-I, 0)

    cost = np.sum(f * v_sol + c * u + h * y_pos + p * y_neg)
    return u, I, cost

z_plus = +delta.copy()
z_minus = -delta.copy()

u_p, I_p, cost_p = simulate(z_plus)
u_m, I_m, cost_m = simulate(z_minus)

print(f"全正场景成本: {cost_p:.2f}")
print(f"全负场景成本: {cost_m:.2f}")

In [None]:
# 导出关键结果
result_df = pd.DataFrame({
    'period': np.arange(1, K+1),
    'd_hat_14d': d_hat,
    'order_flag_v': v_sol.astype(int),
    'u0_nominal': u0_sol,
    'I_nom_end': I_nom,
})
result_df['order_qty_nominal'] = np.where(result_df['order_flag_v']>0.5, result_df['u0_nominal'], 0.0)

print("订购计划（名义）预览：")
display(result_df.head(12))

out_csv = 'AAR_14days_plan.csv'
result_df.to_csv(out_csv, index=False)
print(f"已保存: {out_csv}")

## 备注
- 本实现采用“极点场景法”近似箱型不确定集下的 AAR 最坏情形。相较论文中的对偶/绝对值等价重整，这种方法更通用但可能带来更大的场景规模；这里通过“全正/全负 + 单点翻转”控制规模。
- 若要完全复刻 Ben-Tal et al. (2004) 的等价重整形式，需要将目标与约束按“常数项与不确定项分解”，对每个线性不等式引入左右 envelope 变量（类似文中式(18)-(25)），并对每个包含不确定变量的线性组合添加相应的上界变量与绝对值束缚。
- 实务上还可增加：最大库存容量、最小起订量、提前期、服务水平（CVaR）等扩展约束。

In [None]:
# 随机多场景蒙特卡洛评估（围绕 Box 内均匀采样）
num_mc = 50
rng = np.random.default_rng(123)
mc_costs = []
for _ in range(num_mc):
    z = rng.uniform(low=-delta, high=+delta)
    u, I, cost = simulate(z)
    mc_costs.append(cost)

print(f"蒙特卡洛成本均值: {np.mean(mc_costs):.2f}, 标准差: {np.std(mc_costs):.2f}")
plt.figure(figsize=(8,4))
plt.hist(mc_costs, bins=12, alpha=0.7)
plt.axvline(np.mean(mc_costs), color='r', linestyle='--', label='均值')
plt.title('蒙特卡洛场景成本分布')
plt.legend()
plt.show()


## 完全AAR（对偶重构，Box不确定集）
实现 Ben-Tal 等（2004）风格的等价线性化：
- 将目标改写为 W 的上界约束，并为 z 系数引入对偶变量 `n_t`：`W ≥ 常数项 + ∑ δ_t n_t`，且 `-n_t ≤ 目标中的 z 系数 ≤ n_t`。
- 对约束 `y_h(k;z) ≥ h·I_k(z)` 与 `y_p(k;z) ≥ p·(-I_k(z))`，对每个 k 引入包络变量 `s^h_{k,t}`、`s^p_{k,t}`，并施加 `常数差 ≥ ∑ δ_t s` 与 `-s ≤ 系数差 ≤ s`。 
- 对 `0 ≤ u_k(z) ≤ M v_k` 施加鲁棒界：`u0_k − ∑ δ_t x_{k,t} ≥ 0`，`u0_k + ∑ δ_t x_{k,t} ≤ M v_k`，以及 `−x_{k,t} ≤ u_{k,t} ≤ x_{k,t}`。

In [None]:
# 构建完全AAR（对偶重构）MILP（Box集）
# 记号对齐：
# - y_k 表示持有库存代价项的“包络常数”（非场景变量），s_h[k,t] 包络 z 系数；同理 p-侧用 y'_k 与 s_p[k,t]
# - 我们不再显式枚举场景 z，而是对所有 z∈Box 同时保证约束成立

m2 = pl.LpProblem("AAR_Exact_Dual_Box", pl.LpMinimize)

# 决策
v2 = pl.LpVariable.dicts('v', list(range(1, K+1)), lowBound=0, upBound=1, cat=pl.LpBinary)
u0_2 = pl.LpVariable.dicts('u0', list(range(1, K+1)), lowBound=0)
A2 = {}
for k in range(1, K+1):
    for t in range(1, k):
        A2[(k,t)] = pl.LpVariable(f"a_{k}_{t}", lowBound=None)

# 目标对偶变量
W2 = pl.LpVariable('W', lowBound=0)
n_t = pl.LpVariable.dicts('n', list(range(1, K+1)), lowBound=0)  # 绝对值上线

# y_h, y_p 的常数项包络，以及各自对应的系数包络 s_h, s_p
# 直观：对约束 "y_h(k) ≥ h * I_k(z)"，展开 I_k(z) = I1 + sum_{i≤k} u_i(z) − d_hat_k − z_k
# 将 z 系数对应到 s_h[k,t]
y_h0 = pl.LpVariable.dicts('y_h0', list(range(1, K+1)), lowBound=0)
y_p0 = pl.LpVariable.dicts('y_p0', list(range(1, K+1)), lowBound=0)

s_h = {}
s_p = {}
for k in range(1, K+1):
    for t in range(1, k+1):  # z_t 影响至 k
        s_h[(k,t)] = pl.LpVariable(f"s_h_{k}_{t}", lowBound=0)
        s_p[(k,t)] = pl.LpVariable(f"s_p_{k}_{t}", lowBound=0)

# 鲁棒界化 u 的上下界
x_kt = {}
for k in range(1, K+1):
    for t in range(1, k):
        x_kt[(k,t)] = pl.LpVariable(f"x_{k}_{t}", lowBound=0)
        # −x ≤ a ≤ x
        m2 += -x_kt[(k,t)] <= A2[(k,t)]
        m2 += A2[(k,t)] <= x_kt[(k,t)]
    # 下界：u_k(z) ≥ 0 ⇒ u0_k + ∑_{t<k} a_{k,t} z_t ≥ 0 for all |z_t|≤δ_t
    # 等价为 u0_k − ∑_{t<k} δ_t x_{k,t} ≥ 0
    if k == 1:
        m2 += u0_2[1] >= 0
    else:
        m2 += u0_2[k] - pl.lpSum(delta[t-1]*x_kt[(k,t)] for t in range(1,k)) >= 0
    # 上界：u_k(z) ≤ M v_k ⇒ u0_k + ∑ a z ≤ M v_k ⇒ u0_k + ∑ δ_t |a| ≤ M v_k
    if k == 1:
        m2 += u0_2[1] <= M * v2[1]
    else:
        m2 += u0_2[k] + pl.lpSum(delta[t-1]*x_kt[(k,t)] for t in range(1,k)) <= M * v2[k]

# 库存表达：I_k(z) 递推，将常数项与 z 系数分开
# I_k(z) = C_k + ∑ B_{k,t} z_t
C = np.zeros(K)
B = np.zeros((K, K))  # 仅用于构造线性约束表达式的系数，不是变量
# 但 I_k(z) 依赖 A2 与 u0_2，本身不是常数；我们在约束中直接写出 y_h0 与 s_h 与右侧表达式的关系

# 对每个 k，构造 I_k 的常数项/系数项表达：
# I_1: C1 = I1 + u0_1 − d_hat1；B1,1 = −1；B1,t>1 = 0
# I_k: Ck = C_{k-1} + u0_k − d_hat_k；Bk,t = B_{k-1,t} + a_{k,t} (t<k)，对 t=k 有 Bk,k = −1
# 我们用符号方式在约束里展开，而不显式存 B（因为 a_{k,t} 是变量）。

# y_h 包络： y_h0[k] + ∑ δ_t s_h[k,t] ≥ h * (常数项)；且 −s_h ≤ (z系数·h) ≤ s_h
# y_p 包络（缺货）： y_p0[k] + ∑ δ_t s_p[k,t] ≥ p * (−常数项)；且 −s_p ≤ (−z系数·p) ≤ s_p

# 预先为递推构造辅助：I_k 的常数项符号表达（线性变量和常数）
# C_k_var = I1 + ∑_{i≤k} u0_i − ∑_{i≤k} d_hat_i
cum_dhat = np.cumsum(d_hat)

for k in range(1, K+1):
    # 常数项：I1 + sum_{i<=k} u0_i − sum_{i<=k} d_hat_i
    const_expr = I1 + pl.lpSum(u0_2[i] for i in range(1, k+1)) - cum_dhat[k-1]

    # z 系数：对于任一 t≤k，系数 = sum_{i=max(2,t+1)}^{k} a_{i,t}  − 1_{t≤k 且 t==k}
    # 解释：z_t 影响所有 i>t 的 u_i，通过 a_{i,t}；且 d_hat_k 对应的 z_k 系数为 −1
    coeffs = {}
    for t in range(1, k+1):
        s = 0
        for i in range(max(2, t+1), k+1):
            if (i,t) in A2:
                s += A2[(i,t)]
        if t == k:
            s += -1
        coeffs[t] = s

    # y_h 包络
    m2 += y_h0[k] + pl.lpSum(delta[t-1]*s_h[(k,t)] for t in range(1,k+1)) >= h * const_expr
    for t in range(1, k+1):
        m2 += -s_h[(k,t)] <= h * coeffs[t]
        m2 += h * coeffs[t] <= s_h[(k,t)]

    # y_p 包络（对 p·(-I_k)）
    # -I_k 的常数项为 -const_expr；z 系数为 -coeffs[t]
    m2 += y_p0[k] + pl.lpSum(delta[t-1]*s_p[(k,t)] for t in range(1,k+1)) >= p * (-const_expr)
    for t in range(1, k+1):
        m2 += -s_p[(k,t)] <= -p * coeffs[t]
        m2 += -p * coeffs[t] <= s_p[(k,t)]

# 目标：W2 ≥ sum_k( f v_k + c u0_k + y_h0[k] + y_p0[k] ) + ∑ δ_t n_t
# 同时对目标中每个 t 的 z 系数的绝对值用 n_t 约束
# 目标的 z 系数来自：∑_k c* (∑_{i>t} a_{i,t}) + 0（y_h0,y_p0无 z）；固定费与常数项不含 z
for t in range(1, K+1):
    coeff_t = pl.lpSum(A2[(i,t)] for i in range(max(2, t+1), K+1) if (i,t) in A2)
    m2 += -n_t[t] <= c * coeff_t
    m2 += c * coeff_t <= n_t[t]

m2 += W2
m2 += W2 >= pl.lpSum(f * v2[k] + c * u0_2[k] + y_h0[k] + y_p0[k] for k in range(1, K+1)) \
          + pl.lpSum(delta[t-1]*n_t[t] for t in range(1, K+1))

print("完全AAR（对偶重构）模型已构建。变量数：", len(m2.variables()))

In [None]:
# 求解完全AAR模型
solver = pl.PULP_CBC_CMD(msg=False, timeLimit=120)
res2 = m2.solve(solver)
print("Status:", pl.LpStatus[res2])
print("W2 (最坏情形成本上界):", pl.value(W2))

v2_sol = np.array([pl.value(v2[k]) for k in range(1, K+1)])
u0_2_sol = np.array([pl.value(u0_2[k]) for k in range(1, K+1)])
print("v2:", v2_sol.astype(int))
print("u0_2:", np.round(u0_2_sol, 2))

## 普通静态鲁棒（基线）
- 下单量不随需求偏差调整：`u_k` 固定，不含仿射系数。
- 对 Box 集 `d_k ∈ [d_hat_k ± δ_k]`，直接对约束做最坏情形鲁棒化：
  - `u_k` 上下界（用 `v_k` 控制）与库存平衡通过对偶包络变量实现，形式同上但删除所有 `a_{k,t}` 相关项。

In [None]:
# 基线：静态鲁棒（无仿射项）
m3 = pl.LpProblem("Static_Robust_Box", pl.LpMinimize)

v3 = pl.LpVariable.dicts('v', list(range(1, K+1)), lowBound=0, upBound=1, cat=pl.LpBinary)
u3 = pl.LpVariable.dicts('u', list(range(1, K+1)), lowBound=0)

# 目标对偶
W3 = pl.LpVariable('W', lowBound=0)
n3_t = pl.LpVariable.dicts('n', list(range(1, K+1)), lowBound=0)

y3_h0 = pl.LpVariable.dicts('y_h0', list(range(1, K+1)), lowBound=0)
y3_p0 = pl.LpVariable.dicts('y_p0', list(range(1, K+1)), lowBound=0)

s3_h = {}
s3_p = {}
for k in range(1, K+1):
    for t in range(1, k+1):
        s3_h[(k,t)] = pl.LpVariable(f"s3_h_{k}_{t}", lowBound=0)
        s3_p[(k,t)] = pl.LpVariable(f"s3_p_{k}_{t}", lowBound=0)

# u 上下界（鲁棒化后）
for k in range(1, K+1):
    m3 += u3[k] >= 0
    m3 += u3[k] <= M * v3[k]

# I_k(z) 常数项与 z 系数（此处无 a_{k,t}，仅 z_k 直接进入）
cum_dhat = np.cumsum(d_hat)
for k in range(1, K+1):
    const_expr = I1 + pl.lpSum(u3[i] for i in range(1, k+1)) - cum_dhat[k-1]
    # z 系数：只有 t=k 时为 −1，其余 0
    # y_h 包络
    m3 += y3_h0[k] + delta[k-1]*s3_h[(k,k)] >= h * const_expr
    m3 += -s3_h[(k,k)] <= -h
    m3 += -h <= s3_h[(k,k)]
    for t in range(1, k):
        # 这些 s3_h 可以为 0，不强加约束，但保留变量避免约束缺失
        m3 += s3_h[(k,t)] >= 0
    # y_p 包络
    m3 += y3_p0[k] + delta[k-1]*s3_p[(k,k)] >= p * (-const_expr)
    m3 += -s3_p[(k,k)] <= +p
    m3 += +p <= s3_p[(k,k)]
    for t in range(1, k):
        m3 += s3_p[(k,t)] >= 0

# 目标：W3 ≥ ∑(f v + c u + y_h0 + y_p0) + ∑ δ_t n_t；目标 z 系数全为0 ⇒ n_t 可为0
for t in range(1, K+1):
    m3 += n3_t[t] >= 0  # 冗余占位

m3 += W3
m3 += W3 >= pl.lpSum(f*v3[k] + c*u3[k] + y3_h0[k] + y3_p0[k] for k in range(1, K+1)) \
         + pl.lpSum(delta[t-1]*n3_t[t] for t in range(1, K+1))

print("静态鲁棒模型已构建。变量数：", len(m3.variables()))

In [None]:
# 求解基线模型并对比
res3 = m3.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=120))
print("Static Status:", pl.LpStatus[res3])
print("W3:", pl.value(W3))

v3_sol = np.array([pl.value(v3[k]) for k in range(1, K+1)])
u3_sol = np.array([pl.value(u3[k]) for k in range(1, K+1)])

# 汇总对比
compare = pd.DataFrame({
    'period': np.arange(1, K+1),
    'd_hat_14d': d_hat,
    'AAR_v': v2_sol.astype(int),
    'AAR_u0': u0_2_sol,
    'Static_v': v3_sol.astype(int),
    'Static_u': u3_sol,
})
compare.head(12)

### 修复说明（重要）
此前“完全AAR（对偶重构）”与“静态鲁棒基线”的目标函数遗漏了包络项中的 `∑ δ_t · s`，导致模型可以无成本地让 `s` 变大以满足鲁棒约束，从而得到 `W=0`、`u=0` 的伪最优。下面给出修正后的两个模型：
- 在目标中显式加入 `y_h0 + ∑ δ s_h` 与 `y_p0 + ∑ δ s_p`；
- 保持 `c·u` 的鲁棒项采用 `∑ δ n_t`（对 `a_{k,t}` 的 z 系数包络）。

In [None]:
# 重新构建完全AAR（修正目标包含 ∑δ·s）
m2_fix = pl.LpProblem("AAR_Exact_Dual_Box_Fix", pl.LpMinimize)

v2f = pl.LpVariable.dicts('v', list(range(1, K+1)), lowBound=0, upBound=1, cat=pl.LpBinary)
u0_2f = pl.LpVariable.dicts('u0', list(range(1, K+1)), lowBound=0)
A2f = {}
for k in range(1, K+1):
    for t in range(1, k):
        A2f[(k,t)] = pl.LpVariable(f"a_{k}_{t}", lowBound=None)

W2f = pl.LpVariable('W', lowBound=0)
n_tf = pl.LpVariable.dicts('n', list(range(1, K+1)), lowBound=0)

yh0 = pl.LpVariable.dicts('yh0', list(range(1, K+1)), lowBound=0)
yp0 = pl.LpVariable.dicts('yp0', list(range(1, K+1)), lowBound=0)
sh = {}
sp = {}
for k in range(1, K+1):
    for t in range(1, k+1):
        sh[(k,t)] = pl.LpVariable(f"sh_{k}_{t}", lowBound=0)
        sp[(k,t)] = pl.LpVariable(f"sp_{k}_{t}", lowBound=0)

xkt = {}
for k in range(1, K+1):
    for t in range(1, k):
        xkt[(k,t)] = pl.LpVariable(f"x_{k}_{t}", lowBound=0)
        m2_fix += -xkt[(k,t)] <= A2f[(k,t)]
        m2_fix += A2f[(k,t)] <= xkt[(k,t)]
    if k == 1:
        m2_fix += u0_2f[1] >= 0
        m2_fix += u0_2f[1] <= M * v2f[1]
    else:
        m2_fix += u0_2f[k] - pl.lpSum(delta[t-1]*xkt[(k,t)] for t in range(1,k)) >= 0
        m2_fix += u0_2f[k] + pl.lpSum(delta[t-1]*xkt[(k,t)] for t in range(1,k)) <= M * v2f[k]

cum_dhat = np.cumsum(d_hat)
for k in range(1, K+1):
    const_expr = I1 + pl.lpSum(u0_2f[i] for i in range(1, k+1)) - cum_dhat[k-1]
    coeffs = {}
    for t in range(1, k+1):
        s_sum = pl.lpSum(A2f[(i,t)] for i in range(max(2,t+1), k+1) if (i,t) in A2f)
        if t == k:
            coeffs[t] = s_sum - 1
        else:
            coeffs[t] = s_sum
    # y_h 包络
    m2_fix += yh0[k] + pl.lpSum(delta[t-1]*sh[(k,t)] for t in range(1, k+1)) >= h * const_expr
    for t in range(1, k+1):
        m2_fix += -sh[(k,t)] <= h * coeffs[t]
        m2_fix += h * coeffs[t] <= sh[(k,t)]
    # y_p 包络
    m2_fix += yp0[k] + pl.lpSum(delta[t-1]*sp[(k,t)] for t in range(1, k+1)) >= p * (-const_expr)
    for t in range(1, k+1):
        m2_fix += -sp[(k,t)] <= -p * coeffs[t]
        m2_fix += -p * coeffs[t] <= sp[(k,t)]

# 目标中的 z 系数来自 c*u(z) 的 z 部分：sum_{k} c*sum_{t<k} a_{k,t} z_t ⇒ 对每个 t 系数为 c*sum_{k>t} a_{k,t}
for t in range(1, K+1):
    coeff_t = pl.lpSum(A2f[(i,t)] for i in range(max(2,t+1), K+1) if (i,t) in A2f)
    m2_fix += -n_tf[t] <= c * coeff_t
    m2_fix += c * coeff_t <= n_tf[t]

# 修正目标：包含 y_h0 + ∑δ·sh 与 y_p0 + ∑δ·sp
robust_y = pl.lpSum(yh0[k] + pl.lpSum(delta[t-1]*sh[(k,t)] for t in range(1,k+1)) +
                    yp0[k] + pl.lpSum(delta[t-1]*sp[(k,t)] for t in range(1,k+1))
                    for k in range(1, K+1))

m2_fix += W2f
m2_fix += W2f >= pl.lpSum(f*v2f[k] + c*u0_2f[k] for k in range(1, K+1)) + robust_y \
                 + pl.lpSum(delta[t-1]*n_tf[t] for t in range(1, K+1))

print("修正后的完全AAR模型已构建。变量数：", len(m2_fix.variables()))

In [None]:
res2f = m2_fix.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=120))
print("Status:", pl.LpStatus[res2f])
print("W2f:", pl.value(W2f))

v2f_sol = np.array([pl.value(v2f[k]) for k in range(1, K+1)])
u0_2f_sol = np.array([pl.value(u0_2f[k]) for k in range(1, K+1)])
print("v2f:", v2f_sol.astype(int))
print("u0_2f:", np.round(u0_2f_sol,2))

In [None]:
# 重新构建并修正静态鲁棒：目标包含 ∑δ·s
m3_fix = pl.LpProblem("Static_Robust_Box_Fix", pl.LpMinimize)

v3f = pl.LpVariable.dicts('v', list(range(1, K+1)), lowBound=0, upBound=1, cat=pl.LpBinary)
u3f = pl.LpVariable.dicts('u', list(range(1, K+1)), lowBound=0)
W3f = pl.LpVariable('W', lowBound=0)

y3h0 = pl.LpVariable.dicts('y_h0', list(range(1, K+1)), lowBound=0)
y3p0 = pl.LpVariable.dicts('y_p0', list(range(1, K+1)), lowBound=0)
S3h = {}
S3p = {}
for k in range(1, K+1):
    for t in range(1, k+1):
        S3h[(k,t)] = pl.LpVariable(f"S3h_{k}_{t}", lowBound=0)
        S3p[(k,t)] = pl.LpVariable(f"S3p_{k}_{t}", lowBound=0)

for k in range(1, K+1):
    m3_fix += u3f[k] >= 0
    m3_fix += u3f[k] <= M * v3f[k]

cum_dhat = np.cumsum(d_hat)
for k in range(1, K+1):
    const_expr = I1 + pl.lpSum(u3f[i] for i in range(1, k+1)) - cum_dhat[k-1]
    # y_h: z 只在 t=k 处系数为 −1
    m3_fix += y3h0[k] + delta[k-1]*S3h[(k,k)] >= h * const_expr
    m3_fix += -S3h[(k,k)] <= -h
    m3_fix += -h <= S3h[(k,k)]
    for t in range(1, k):
        m3_fix += S3h[(k,t)] >= 0
    # y_p
    m3_fix += y3p0[k] + delta[k-1]*S3p[(k,k)] >= p * (-const_expr)
    m3_fix += -S3p[(k,k)] <= +p
    m3_fix += +p <= S3p[(k,k)]
    for t in range(1, k):
        m3_fix += S3p[(k,t)] >= 0

# 修正目标，加入 ∑δ·S（包络项的鲁棒部分）
robust_y3 = pl.lpSum(y3h0[k] + delta[k-1]*S3h[(k,k)] + y3p0[k] + delta[k-1]*S3p[(k,k)]
                     for k in range(1, K+1))

m3_fix += W3f
m3_fix += W3f >= pl.lpSum(f*v3f[k] + c*u3f[k] for k in range(1, K+1)) + robust_y3

res3f = m3_fix.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=120))
print("Static Status:", pl.LpStatus[res3f])
print("W3f:", pl.value(W3f))

v3f_sol = np.array([pl.value(v3f[k]) for k in range(1, K+1)])
u3f_sol = np.array([pl.value(u3f[k]) for k in range(1, K+1)])
print("v3f:", v3f_sol.astype(int))
print("u3f:", np.round(u3f_sol,2))

In [None]:
# 简要对比：修正后的完全AAR vs 修正后的静态鲁棒
compare_fix = pd.DataFrame({
    'period': np.arange(1, K+1),
    'd_hat_14d': d_hat,
    'AAR_v': v2f_sol.astype(int),
    'AAR_u0': u0_2f_sol,
    'Static_v': v3f_sol.astype(int),
    'Static_u': u3f_sol,
})
compare_fix

## 鲁棒优化评估指标与对比
为 AAR 与 Static 两种策略在名义与随机不确定下进行评估，输出：
- 成本分解：固定费、变动费、持有、缺货、总成本
- 服务水平：满足率（fill rate）、缺货频次、期末库存均值/波动
- 风险度量：最坏情形成本、分位成本（VaR）、条件分位成本（CVaR）
- 稳健性：随机场景下成本均值/标准差、P90/P95 等


In [None]:
# 评估函数：给定策略(订单规则)在场景 z 下的轨迹与成本分解

def affine_policy_orders(u0_vec, A_mat, z_vec):
    K = len(u0_vec)
    u = u0_vec.copy().astype(float)
    for k in range(1, K):
        if A_mat is not None:
            u[k] = u0_vec[k] + np.dot(A_mat[k, :k], z_vec[:k])
    return np.maximum(u, 0.0)


def simulate_trajectory(u_vec, v_vec, d_hat_vec, z_vec, I1, costs):
    f, c, h, p = costs
    K = len(u_vec)
    I = np.zeros(K)
    Ypos = np.zeros(K)
    Yneg = np.zeros(K)
    d_real = d_hat_vec + z_vec

    I[0] = I1 + u_vec[0] - d_real[0]
    for k in range(1, K):
        I[k] = I[k-1] + u_vec[k] - d_real[k]
    Ypos = np.maximum(I, 0)
    Yneg = np.maximum(-I, 0)

    fixed = f * np.sum((v_vec > 0.5).astype(int))
    variable = c * np.sum(u_vec)
    holding = h * np.sum(Ypos)
    penalty = p * np.sum(Yneg)
    total = fixed + variable + holding + penalty

    # 服务水平（按期）：若 I[k] >= 0 视为满足；也可按需求满足率定义
    service_rate = np.mean(I >= 0)
    stockouts = int(np.sum(I < 0))

    return {
        'total': total,
        'fixed': fixed,
        'variable': variable,
        'holding': holding,
        'penalty': penalty,
        'service_rate': service_rate,
        'stockouts': stockouts,
        'I_series': I,
        'd_real': d_real,
        'u': u_vec
    }


def evaluate_policy(u0_vec, A_mat, v_vec, d_hat_vec, delta_vec, I1, costs, num_mc=200, rng_seed=2025):
    rng = np.random.default_rng(rng_seed)
    # 名义场景
    z0 = np.zeros_like(delta_vec)
    u_nom = affine_policy_orders(u0_vec, A_mat, z0)
    nom = simulate_trajectory(u_nom, v_vec, d_hat_vec, z0, I1, costs)

    # 全正/全负
    z_plus = +delta_vec
    z_minus = -delta_vec
    u_plus = affine_policy_orders(u0_vec, A_mat, z_plus)
    u_minus = affine_policy_orders(u0_vec, A_mat, z_minus)
    plus = simulate_trajectory(u_plus, v_vec, d_hat_vec, z_plus, I1, costs)
    minus = simulate_trajectory(u_minus, v_vec, d_hat_vec, z_minus, I1, costs)

    # 随机场景评估
    mc = []
    for _ in range(num_mc):
        z = rng.uniform(low=-delta_vec, high=+delta_vec)
        u = affine_policy_orders(u0_vec, A_mat, z)
        mc.append(simulate_trajectory(u, v_vec, d_hat_vec, z, I1, costs))

    def agg(items, key):
        arr = np.array([it[key] for it in items])
        return {
            'mean': float(np.mean(arr)),
            'std': float(np.std(arr)),
            'p90': float(np.quantile(arr, 0.90)),
            'p95': float(np.quantile(arr, 0.95)),
            'p99': float(np.quantile(arr, 0.99)),
        }

    mc_total = agg(mc, 'total')
    mc_service = agg(mc, 'service_rate')
    mc_stockouts = agg(mc, 'stockouts')

    # CVaR@95（简单近似）：取最差5%样本的均值
    totals = np.array([it['total'] for it in mc])
    q95 = np.quantile(totals, 0.95)
    cvar95 = float(totals[totals >= q95].mean())

    return {
        'nominal': nom,
        'plus': plus,
        'minus': minus,
        'mc': {
            'total': mc_total,
            'service_rate': mc_service,
            'stockouts': mc_stockouts,
            'VaR95': float(q95),
            'CVaR95': cvar95
        }
    }


In [None]:
# 提取已求解结果并构造仿射矩阵 A（修正构建）
A2f_mat = np.zeros((K, K))
for k in range(1, K+1):
    for t in range(1, k):
        A2f_mat[k-1, t-1] = pl.value(A2f[(k,t)]) if (k,t) in A2f else 0.0

# 评估 AAR（完全对偶重构）与 Static（修正）
costs_tuple = (f, c, h, p)

aar_eval = evaluate_policy(u0_2f_sol, A2f_mat, v2f_sol, d_hat, delta, I1, costs_tuple)
static_eval = evaluate_policy(u3f_sol, None, v3f_sol, d_hat, delta, I1, costs_tuple)

# 汇总表（核心指标）
summary = pd.DataFrame([
    {
        'method': 'AAR (dual exact)',
        'W* (model)': pl.value(W2f),
        'nominal_total': aar_eval['nominal']['total'],
        'nominal_service': aar_eval['nominal']['service_rate'],
        'worst_plus_total': aar_eval['plus']['total'],
        'worst_minus_total': aar_eval['minus']['total'],
        'MC_mean_total': aar_eval['mc']['total']['mean'],
        'MC_std_total': aar_eval['mc']['total']['std'],
        'MC_VaR95': aar_eval['mc']['VaR95'],
        'MC_CVaR95': aar_eval['mc']['CVaR95'],
        'MC_mean_service': aar_eval['mc']['service_rate']['mean'],
    },
    {
        'method': 'Static robust',
        'W* (model)': pl.value(W3f),
        'nominal_total': static_eval['nominal']['total'],
        'nominal_service': static_eval['nominal']['service_rate'],
        'worst_plus_total': static_eval['plus']['total'],
        'worst_minus_total': static_eval['minus']['total'],
        'MC_mean_total': static_eval['mc']['total']['mean'],
        'MC_std_total': static_eval['mc']['total']['std'],
        'MC_VaR95': static_eval['mc']['VaR95'],
        'MC_CVaR95': static_eval['mc']['CVaR95'],
        'MC_mean_service': static_eval['mc']['service_rate']['mean'],
    }
])
summary

In [None]:
# 成本分解对比（名义场景）
nom_breakdown = pd.DataFrame([
    {
        'method': 'AAR',
        'fixed': aar_eval['nominal']['fixed'],
        'variable': aar_eval['nominal']['variable'],
        'holding': aar_eval['nominal']['holding'],
        'penalty': aar_eval['nominal']['penalty'],
        'total': aar_eval['nominal']['total'],
    },
    {
        'method': 'Static',
        'fixed': static_eval['nominal']['fixed'],
        'variable': static_eval['nominal']['variable'],
        'holding': static_eval['nominal']['holding'],
        'penalty': static_eval['nominal']['penalty'],
        'total': static_eval['nominal']['total'],
    }
])
nom_breakdown

In [None]:
# 随机场景稳定性与风险可视化（总成本分布）
plt.figure(figsize=(10,5))
# 采样一次，重用 evaluate 中的 MC 结果（需要访问里面的分布，简化用近似参数）
# 这里简单再采样，确保可视化存在
rng = np.random.default_rng(2026)
N = 300
AAR_totals = []
Static_totals = []
for _ in range(N):
    z = rng.uniform(low=-delta, high=+delta)
    u_aar = affine_policy_orders(u0_2f_sol, A2f_mat, z)
    AAR_totals.append(simulate_trajectory(u_aar, v2f_sol, d_hat, z, I1, costs_tuple)['total'])
    u_sta = affine_policy_orders(u3f_sol, None, z)
    Static_totals.append(simulate_trajectory(u_sta, v3f_sol, d_hat, z, I1, costs_tuple)['total'])

plt.hist(AAR_totals, bins=20, alpha=0.6, label='AAR total')
plt.hist(Static_totals, bins=20, alpha=0.6, label='Static total')
plt.axvline(np.mean(AAR_totals), color='C0', linestyle='--', label='AAR mean')
plt.axvline(np.mean(Static_totals), color='C1', linestyle='--', label='Static mean')
plt.title('随机Box场景总成本分布对比')
plt.legend()
plt.show()