In [1]:
from pyomo.environ import *
from pyomo.contrib.piecewise import PiecewiseLinearFunction
from pyomo.core.base import TransformationFactory
from pyomo.opt import SolverStatus, TerminationCondition
#from pyomo.kernel import *
#from pyomo.core.kernel.piecewise_library.transforms_nd import piecewise_nd
#from pyomo.core.kernel.piecewise_library import util as pwutil  # 方便自动生成剖分
from dataclasses import dataclass, field
from typing import Callable, List, Dict, Tuple
import numpy as np
import math
import matplotlib.pyplot as plt
import bisect
import itertools as it
#from pyomo.environ import ConcreteModel, Var, Constraint, Expression

In [2]:
def evaluate_Q_at(model, first_stg_vars, first_stg_vals, solver):
    """
    Given y = y_val , minimize obj_expr and return v(y).
    This function temporarily increments the objective and clears it after completion, without changing the model structure.
    """
    # Clear any remaining As/pw/obj (to prevent it from being left over from the previous round)
    del_components(model)
    
    for u, v in zip(first_stg_vars, first_stg_vals):
        u.fix(value(v))
    model.obj = Objective(expr=model.obj_expr, sense=minimize)
    results = solver.solve(model, tee=False)

    status_ok = (results.solver.status == SolverStatus.ok)
    term_ok = (results.solver.termination_condition == TerminationCondition.optimal)
    if not (status_ok and term_ok):
        # check if solution okay
        raise RuntimeError(f"Scenario evaluate at y={first_stg_vals} not optimal: "
                           f"status={results.solver.status}, term={results.solver.termination_condition}")

    v_opt = value(model.obj_expr)
    # clear temporarily objective
    model.del_component('obj')
    for u in first_stg_vars:
        u.unfix()
    return v_opt

def del_components(model):
    for comp in ['obj', 'As', 'pw', 'pw_fun', 'pw_As', 'pw_link']:
        if hasattr(model, comp):
            model.del_component(comp)

def corners_from_bounds(firt_stg_vars):
    """给一组 Pyomo Var 生成所有 box 角点（每维取 lb/ub）"""
    bounds = []
    for y in firt_stg_vars:
        lb, ub = y.lb, y.ub
        if lb is None or ub is None:
            raise ValueError(f"{y.name} 缺少上下界，无法生成角点")
        bounds.append((float(lb), float(ub)))
    # 每维挑 lb/ub 的笛卡尔积
    return list(it.product(*[(lb, ub) for (lb, ub) in bounds]))


def add_nd_piecewise(
    model,
    firt_stg_vars,                 # [x1, x2, ..., xN]  模型里的 first stage Var
    points,                 # [(c11,...,c1N), (c21,...,c2N), ...]  所有节点（同维度 N）
    values,                 # 与 points 对齐的一维 list/array，或 dict{point_tuple: value}
    name="pw",
    relation="==",          # '==', '>='(下界/内逼近), '<='(上界/外逼近)
    round_ndigits=12,       # 为避免浮点比较问题，对坐标做轻微 round
):
    """
    返回 (z, pw)。z 是 Var（或你可将 make_z_var=False 改成返回 Expression）。
    """

    # 维度检查
    if len(points) == 0:
        raise ValueError("points 不能为空")
    N = len(firt_stg_vars)
    for pt in points:
        if len(pt) != N:
            raise ValueError(f"points 中出现与 x_vars 维度不一致的点: {pt}")
        
    del_components(model)

    # 统一坐标的浮点表示，避免查表时精度问题
    def keyize(coords):
        return tuple(round(float(c), round_ndigits) for c in coords)

    norm_points = [keyize(pt) for pt in points]

    # 把 values 统一成 dict 表
    if isinstance(values, dict):
        table = {keyize(k): float(v) for k, v in values.items()}
        # 确保每个 point 都有值
        miss = [pt for pt in norm_points if pt not in table]
        if miss:
            raise KeyError(f"values 缺少这些点的取值: {miss[:5]}{' ...' if len(miss)>5 else ''}")
    else:
        # 视作与 points 对齐的一维序列
        if len(values) != len(points):
            raise ValueError("values 长度应与 points 数量一致（或传 dict）")
        table = {pt: float(v) for pt, v in zip(norm_points, values)}

    # 查表函数（仅在节点上被调用）
    def _f_from_table(*coords):
        return table[keyize(coords)]

    # 创建并挂到模型
    pw = PiecewiseLinearFunction(points=norm_points, function=_f_from_table, name=f"{name}_fun")
    model.add_component(pw.name, pw)

    # 组成表达式
    pw_expr = pw(*firt_stg_vars)

    As = Var(name=f"{name}_As")
    model.add_component(As.name, As)
    if relation == "==":
        link = Constraint(expr=As == pw_expr)
    elif relation == ">=":
        link = Constraint(expr=As >= pw_expr)
    elif relation == "<=":
        link = Constraint(expr=As <= pw_expr)
    else:
        raise ValueError("relation 只能是 '==', '>=', '<='")
    model.add_component(f"{name}_link", link)

    TransformationFactory('contrib.piecewise.convex_combination').apply_to(model)
        
    return As, pw


def clone_and_get_vars(m_old, first_stage_vars):
    """
    克隆模型，并按名字列表返回新模型中的变量组件
    var_names: ['y', 'x', ...]  (只能是容器名，不是元素)
    """
    m_new = m_old.clone()
    first_stage_vars_new = []
    for v in first_stage_vars:
        v_new = m_new.find_component(v.name)
        if v_new is None:
            raise KeyError(f"在新模型里找不到变量 '{v.name}'")
        first_stage_vars_new.append(v_new)
    return m_new, first_stage_vars_new

# delete repeated nodes
def unique_points(points, atol=1e-9):
    out = []
    for p in points:
        if not any(all(abs(a-b) <= atol for a,b in zip(p, q)) for q in out):
            out.append(p)
    return out

In [10]:
def nc_underest(model_list, first_stg_vars_list, m_tmpl_list, target_nodes, picture_shown=False, v_list=False, tolerance=1e-8):
    """
    Parameters:
        #bounds (list): contains 2 float which is lower and upper bound of variable
        model_list (list): model with submodels corresponds to each scenario
        first_stg_var (list): 
        m_tmpl_list (list): [template model, template model first stg variables list]
        target_nodes (float): number of target nodes
        tolerance (float): decide when to stop

    Returns: delta (float): delta
             errors (float): hausdorff error
             y_nodes (list): y node (to make plot)
             as_nodes_list[0] (list): As node value (to make plot)
             ms_list[0] (float): ms for first scenario (to make plot)
    """
    N = len(model_list)
    as_nodes_list = [[] for _ in range(N)]
    ms_list = [None] * N
    new_nodes_list = [None] * N # Storing potential new nodes
    As_min_list = []
    under_tol = 1e-8
    
    add_node_history = []

    # set up solver
    
    solver = SolverFactory('gurobi')
'''    solver.options.update({
        'MIPGap': 1e-4,         
        'MIPGapAbs': 1e-4,       
        'FeasibilityTol': 1e-9,  
        'IntFeasTol':     1e-9,  
        'OptimalityTol': 1e-6,
        'NumericFocus': 2,      
        'ScaleFlag':    1,       
        'Presolve': 2,          
        'Method':  -1,          
        'Crossover': -1,       
        'NonConvex': 2, 
    })'''
    
    
    # start from corner nodes
    first_stg_nodes = corners_from_bounds(first_stg_vars_list[0])
    for i in range(N):
        as_nodes_list[i].extend(
            evaluate_Q_at(model_list[i], first_stg_vars_list[i], node, solver) for node in first_stg_nodes
        )
    print('corner nodes are ', first_stg_nodes)
    print('as_nodes_list are ', as_nodes_list)
    '''
    for node in first_stg_nodes:
        for i in range(N):
            as_nodes_list[i].append(evaluate_Q_at(model_list[i], node, solver))
    '''

    if target_nodes <= len(first_stg_nodes):
        print('target_nodes number should be larger than ',len(first_stg_nodes))
        return

    print('Start from ',len(first_stg_nodes),' nodes')
    print('The goal is to get ',target_nodes,' nodes')
    k_list = []
    for k in range(len(first_stg_nodes)+1,target_nodes+1):
        print('##################################################')
        print('##################################################')
        print('Start adding node ',k)
        k_list.append(k)
        for i in range(N):
            print(' ')
            print('Solving scenario ',i)
            # define piecewise function for each scenario
            del_components(model_list[i])
            As, _ = add_nd_piecewise(
            model_list[i], first_stg_vars_list[i], first_stg_nodes, as_nodes_list[i],
            name="pw", relation="=="
            )
            model_list[i].obj = Objective(expr=model_list[i].obj_expr - As, sense=minimize)
            #results = SolverFactory("gurobi").solve(model_list[i], tee=False)
            results = solver.solve(model_list[i], tee=False)
            if (results.solver.status != SolverStatus.ok) or \
               (results.solver.termination_condition != TerminationCondition.optimal):
                print("⚠ There may be problems with the solution")
                
            ms_list[i] = value(model_list[i].obj)
            # insert new nodes
            new_nodes_list[i] = tuple(value(v) for v in first_stg_vars_list[i])
            print('new node is ',tuple(value(v) for v in first_stg_vars_list[i]))
            print('ms is ',value(model_list[i].obj))

        # define and solve the sum model to check if error at possible new node is too big
        arr = np.array(as_nodes_list, dtype=float, ndmin=2)  
        assum_nodes = arr.sum(axis=0) 

        print('XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX')
        print(first_stg_nodes)
        print(assum_nodes)

        # build As_sum model and solve for possible node pf max error
        model_sum, model_sum_first_stg_vars = clone_and_get_vars(m_tmpl_list[0], m_tmpl_list[1])
        del_components(model_sum)
        As, pw = add_nd_piecewise(
        model_sum, model_sum_first_stg_vars, first_stg_nodes, assum_nodes,
        name="pw", relation="=="
        )
        model_sum.obj = Objective(expr= As, sense=minimize)

        #results = SolverFactory("gurobi").solve(model_sum, tee=False)
        results = solver.solve(model_sum, tee=False)
        if (results.solver.status == SolverStatus.ok) and (results.solver.termination_condition == TerminationCondition.optimal):
            pass
        else:
            print("Sum model doesn't get solved normally")

        # get the output
        As_min = results.problem.lower_bound
        print(f'As_min at possible new node is {As_min}')
        node_star = tuple(value(v) for v in model_sum_first_stg_vars) 

        if (node_star is None) or (node_star in first_stg_nodes):
            avg = []
            for j in range(len(first_stg_nodes[0])):
                comp_vals = [node[j] for node in first_stg_nodes]
                avg.append(sum(comp_vals) / len(comp_vals))
            node_star = tuple(avg)
            As_min = value(pw(*node_star))
        errors_node_star = 0
        for i in range(N):
            errors_node_star += evaluate_Q_at(model_list[i], first_stg_vars_list[i], node_star, solver)
        print(f'Real value at possible new node is {errors_node_star}')
        errors_node_star = abs(As_min - errors_node_star)
        print(f'error at possible new node is {errors_node_star}')

        sum_ms = sum(ms_i for ms_i in ms_list)
        
        print('Sum *****************************************')
        print('error at y_star is ',errors_node_star)
        print('y_star is ',node_star)
        print('ms_list and sum_ms is ',ms_list,sum_ms)
        if errors_node_star > abs(sum_ms):
            new_node = node_star
            print('new node choosen from error')
        else:
            min_index = np.argmin(ms_list)
            new_node = new_nodes_list[min_index]
            print('new node choosen from ms')
        As_min_list.append(As_min+sum_ms)
        add_node_history.append(new_node)
        print('new node is',new_node)
        print('Current As_min is',As_min_list[-1])
        print('*****************************************')
        print('')
        #######################################################              

        print('current nodes are ', first_stg_nodes)
        print('as_nodes_list are ', as_nodes_list)
        print('new_node is', new_node)
        first_stg_nodes.append(new_node)
        for i in range(N):
            as_nodes_list[i].append(evaluate_Q_at(model_list[i], first_stg_vars_list[i], new_node, solver))

    # define and solve the sum model
    arr = np.array(as_nodes_list, dtype=float, ndmin=2)  
    assum_nodes = arr.sum(axis=0) 

    # build As_sum model and solve for possible node pf max error
    model_sum, model_sum_first_stg_vars = clone_and_get_vars(m_tmpl_list[0], m_tmpl_list[1])
    del_components(model_sum)
    As, _ = add_nd_piecewise(
    model_sum, model_sum_first_stg_vars, first_stg_nodes, assum_nodes,
    name="pw", relation="=="
    )
    model_sum.obj = Objective(expr= As, sense=minimize)
    #results = SolverFactory("gurobi").solve(model_sum, tee=False)
    results = solver.solve(model_sum, tee=False)
    if (results.solver.status == SolverStatus.ok) and (results.solver.termination_condition == TerminationCondition.optimal):
        pass
    else:
        print("Sum model doesn't get solved normally")

    # get the output
    output_lb = results.problem.lower_bound + sum(ms_list)
    print('lower bound is ',output_lb)
    print('node is ',tuple(value(v) for v in model_sum_first_stg_vars) )
    
    return output_lb, first_stg_nodes, [k_list, As_min_list, add_node_history]

IndentationError: unexpected indent (2401058676.py, line 45)

In [11]:
import pyomo.environ as pyo

def build_pid_model(
    T=10,                 # 时间步数
    h=0.1,                # 步长 Δt
    scen=None,            # 单个场景数据: {"Ku":..., "tau":..., "d":[...], "sp":[...]}
    weights=(1.0, 0.01),  # 目标权重 (w_e, w_u)
    bounds={"x":(-20,20), "u":(-20,20), "Kp":(0,100), "Ki":(0,1000), "Kd":(0,1000)},
    use_cvar=False,       # 保留参数，但单场景下一般不需要
    alpha=0.95
):
    """
    单场景 PID 控制模型:
      tau * x_dot + x = Ku * u + d
      u = Kp*e + Ki*I + Kd*de/dt,   e = sp - x,   I_dot = e
    离散化: 隐式欧拉
    """
    assert scen is not None, "请提供一个场景字典"

    Ku, tau, d, sp = scen["Ku"], scen["tau"], scen["d"], scen["sp"]

    m = pyo.ConcreteModel()
    m.T = pyo.RangeSet(0, T)
    m.Tm = pyo.RangeSet(1, T)

    # 一阶段变量 (PID 参数)
    m.Kp = pyo.Var(bounds=bounds["Kp"])
    m.Ki = pyo.Var(bounds=bounds["Ki"])
    m.Kd = pyo.Var(bounds=bounds["Kd"])

    # 二阶段变量
    m.x = pyo.Var(m.T, bounds=bounds["x"])
    m.u = pyo.Var(m.T, bounds=bounds["u"])
    m.e = pyo.Var(m.T)
    m.I = pyo.Var(m.T)

    # 误差 e_t = sp_t - x_t
    def _err_rule(m, t): return m.e[t] == sp[t] - m.x[t]
    m.err_def = pyo.Constraint(m.T, rule=_err_rule)

    # I 动态
    def _I_dyn(m, t): return m.I[t] == m.I[t-1] + h*m.e[t]
    m.I_dyn = pyo.Constraint(m.Tm, rule=_I_dyn)

    # 系统动态 (隐式欧拉)
    def _x_dyn(m, t):
        return m.x[t] == m.x[t-1] + (h/tau)*(-m.x[t] + Ku*m.u[t] + d[t])
    m.x_dyn = pyo.Constraint(m.Tm, rule=_x_dyn)

    # PID 控制律
    def _pid_rule(m, t):
        if t == 0:
            return m.u[t] == m.Kp*m.e[t] + m.Ki*m.I[t]
        return m.u[t] == m.Kp*m.e[t] + m.Ki*m.I[t] + m.Kd*(m.e[t]-m.e[t-1])/h
    m.pid = pyo.Constraint(m.T, rule=_pid_rule)

    # 初值
    m.x0 = pyo.Constraint(expr=m.x[0] == 0)
    m.I0 = pyo.Constraint(expr=m.I[0] == 0)

    # 成本函数 (单场景就是一个值)
    w_e, w_u = weights
    m.cost = pyo.Expression(expr=sum(
        h*(w_e*m.e[t]**2 + w_u*m.u[t]**2) for t in m.T
    ))

    # 目标: 单场景情况下就直接最小化 cost
    #m.obj = pyo.Objective(expr=m.cost, sense=pyo.minimize)
    m.obj_expr = pyo.Expression(expr = m.cost)

    return m,  [m.Kp, m.Ki, m.Kd]

In [14]:
T, h = 30, 0.1
times = [t for t in range(T+1)]

def step_sp(t): return 1.0 if t*h >= 0.5 else 0.0
def make_d(amp): return [amp*math.sin(0.5*t*h) for t in times]
scenarios = {
    1: {"prob": 0.4, "Ku": 3.0, "tau": 2.0, "d": make_d(0.2), "sp": [step_sp(t) for t in times]},
}
'''scenarios = {
    1: {"prob": 0.4, "Ku": 3.0, "tau": 2.0, "d": make_d(0.2), "sp": [step_sp(t) for t in times]},
    2: {"prob": 0.4, "Ku": 2.7, "tau": 1.8, "d": make_d(0.5), "sp": [step_sp(t) for t in times]},
    3: {"prob": 0.2, "Ku": 3.3, "tau": 2.2, "d": make_d(0.8), "sp": [step_sp(t) for t in times]},
}'''
'''scen = {
    "Ku": 3.0,
    "tau": 2.0,
    "d": make_d(0.3),
    "sp": [step_sp(t) for t in times],
}'''
model_list = []
first_stg_vars_list = []
for _, scen in scenarios.items():
    m, m_f_stg_vars = build_pid_model(T=T, h=h, scen=scen)
    model_list.append(m)
    first_stg_vars_list.append(m_f_stg_vars)
bounds={"x":(-20,20), "u":(-20,20), "Kp":(0,100), "Ki":(0,1000), "Kd":(0,1000)}
m_tmpl = pyo.ConcreteModel()
m_tmpl.T = pyo.RangeSet(0, T)
m_tmpl.Tm = pyo.RangeSet(1, T)
m_tmpl.Kp = pyo.Var(bounds=bounds["Kp"])
m_tmpl.Ki = pyo.Var(bounds=bounds["Ki"])
m_tmpl.Kd = pyo.Var(bounds=bounds["Kd"])
m_tmpl_list = [m_tmpl, [m_tmpl.Kp, m_tmpl.Ki, m_tmpl.Kd]]

In [15]:
nc_underest(model_list, first_stg_vars_list, m_tmpl_list, target_nodes=10,
                picture_shown=False, v_list=False, tolerance=1e-8
)

corner nodes are  [(0.0, 0.0, 0.0), (0.0, 0.0, 1000.0), (0.0, 1000.0, 0.0), (0.0, 1000.0, 1000.0), (100.0, 0.0, 0.0), (100.0, 0.0, 1000.0), (100.0, 1000.0, 0.0), (100.0, 1000.0, 1000.0)]
as_nodes_list are  [[2.312013591358772, 0.050724842025162725, 0.04638200880665605, 0.05073328930303467, 0.04546167267642503, 0.05072629135761799, 0.0479443564808759, 0.05073385265649808]]
Start from  8  nodes
The goal is to get  10  nodes
##################################################
##################################################
Start adding node  9
 
Solving scenario  0
new node is  (8.037722090352561e-08, 6.385681815005483, 6.385680598918219)
ms is  -2.2433812830236537
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
[(0.0, 0.0, 0.0), (0.0, 0.0, 1000.0), (0.0, 1000.0, 0.0), (0.0, 1000.0, 1000.0), (100.0, 0.0, 0.0), (100.0, 0.0, 1000.0), (100.0, 1000.0, 0.0), (100.0, 1000.0, 1000.0)]
[2.31201359 0.05072484 0.04638201 0.05073329 0.04546167 0.05072629
 0.04794436 0.05073385]
As_min at possible ne

(-2.1852036205893386,
 [(0.0, 0.0, 0.0),
  (0.0, 0.0, 1000.0),
  (0.0, 1000.0, 0.0),
  (0.0, 1000.0, 1000.0),
  (100.0, 0.0, 0.0),
  (100.0, 0.0, 1000.0),
  (100.0, 1000.0, 0.0),
  (100.0, 1000.0, 1000.0),
  (8.037722090352561e-08, 6.385681815005483, 6.385680598918219),
  (0.0, 4.843309089158998, 5.36517882182643)],
 [[9, 10],
  [np.float64(-2.195283802033924), np.float64(-2.178024935245441)],
  [(8.037722090352561e-08, 6.385681815005483, 6.385680598918219),
   (0.0, 4.843309089158998, 5.36517882182643)]])

In [19]:
solver = pyo.SolverFactory("ipopt")
first_stg_nodes = [(0.0, 0.0, 0.0), (0.0, 0.0, 1000.0), (0.0, 1000.0, 0.0), (0.0, 1000.0, 1000.0), (100.0, 0.0, 0.0), (100.0, 0.0, 1000.0), (100.0, 1000.0, 0.0), (100.0, 1000.0, 1000.0)]
assum_nodes = [4.54023507, 0.05320473, 0.04886529, 0.05321642, 0.04795115, 0.05320593, 0.05042764, 0.05321691]

# build As_sum model and solve for possible node pf max error
model_sum, model_sum_first_stg_vars = clone_and_get_vars(m_tmpl_list[0], m_tmpl_list[1])
del_components(model_sum)
As, pw = add_nd_piecewise(
model_sum, model_sum_first_stg_vars, first_stg_nodes, assum_nodes,
name="pw", relation="=="
)
model_sum.obj = Objective(expr= As, sense=minimize)

#results = SolverFactory("gurobi").solve(model_sum, tee=False)
results = solver.solve(model_sum, tee=False)
if (results.solver.status == SolverStatus.ok) and (results.solver.termination_condition == TerminationCondition.optimal):
    pass
else:
    print("Sum model doesn't get solved normally")

# get the output
As_min = results.problem.lower_bound
print(f'As_min at possible new node is {As_min}')
node_star = tuple(value(v) for v in model_sum_first_stg_vars) 

As_min at possible new node is -inf


In [20]:
print(f'As_min at possible new node is {value(model_sum.obj)}')

As_min at possible new node is 0.04795113247498382
