# 案例: 水泵-阀门系统仿真

本案例演示了一个包含水泵和阀门的基本管网系统。系统拓扑为：

`源水库 -> 水泵 -> 管道 -> 阀门 -> 目标水库`

我们将通过在特定时间发布控制消息到消息总线，来手动控制水泵的启停和阀门的开合，并观察系统中水位的变化。

### 1. 导入库并加载配置

In [None]:
import json
import pandas as pd
import matplotlib.pyplot as plt

from swp.core_engine.testing.simulation_harness import SimulationHarness
from swp.simulation_identification.physical_objects.reservoir import Reservoir
from swp.simulation_identification.physical_objects.pipe import Pipe
from swp.simulation_identification.physical_objects.valve import Valve
from swp.simulation_identification.physical_objects.pump import Pump

CONFIG_PATH = 'examples/pump_valve_simulation.json'
with open(CONFIG_PATH, 'r') as f:
    config = json.load(f)

print("配置加载成功！")

plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

### 2. 定义并运行仿真

In [None]:
def run_pump_valve_simulation(cfg):
    print("\n--- 搭建水泵-阀门仿真示例 ---")

    # 1. 仿真和通信设置
    harness = SimulationHarness(cfg['simulation'])
    message_bus = harness.message_bus
    
    # 2. 创建物理组件
    comp_cfgs = cfg['components']
    source_reservoir = Reservoir("source_res", **comp_cfgs['source_reservoir'])
    dest_reservoir = Reservoir("dest_res", **comp_cfgs['dest_reservoir'])
    pump = Pump("pump1", message_bus=message_bus, **comp_cfgs['pump'])
    pipe = Pipe("pipe1", **comp_cfgs['pipe'])
    valve = Valve("valve1", message_bus=message_bus, **comp_cfgs['valve'])

    harness.add_component(source_reservoir)
    harness.add_component(pump)
    harness.add_component(pipe)
    harness.add_component(valve)
    harness.add_component(dest_reservoir)

    # 3. 定义拓扑并构建
    harness.add_connection("source_res", "pump1")
    harness.add_connection("pump1", "pipe1")
    harness.add_connection("pipe1", "valve1")
    harness.add_connection("valve1", "dest_res")
    harness.build()

    # 4. 仿真循环
    num_steps = int(harness.duration / harness.dt)
    events = {event['time']: [] for event in cfg['control_events']}
    for event in cfg['control_events']:
        events[event['time']].append(event)
        
    for i in range(num_steps):
        current_time = i * harness.dt
        print(f"--- 仿真步 {i+1}, 时间: {current_time:.2f}s ---")
        
        # 检查并发布当前时间的控制指令
        if current_time in events:
            for event in events[current_time]:
                if 'message' in event: print(f"\n>>> {event['message']} <<<")
                message_bus.publish(event['topic'], {'control_signal': event['control_signal']})
        
        # 标准化为调用 harness.step()
        harness.step()
        
        print("  组件状态更新:")
        for cid in harness.sorted_components:
            state = harness.components[cid].get_state()
            state_str = ", ".join(f"{k}={v:.2f}" for k, v in state.items() if isinstance(v, (int, float)))
            print(f"    {cid}: {state_str}")
        print("")
        
    print("仿真结束。")
    return harness.history

history = run_pump_valve_simulation(config)

### 3. 可视化结果

In [None]:
def plot_history(simulation_history):
    if not simulation_history:
        print("没有历史数据可供绘制。")
        return
        
    # 将历史记录转换为Pandas DataFrame
    df = pd.DataFrame()
    for step in simulation_history:
        time = step['time']
        row = {'time': time}
        for comp, states in step.items():
            if comp == 'time': continue
            for state, value in states.items():
                row[f"{comp}.{state}"] = value
        df = pd.concat([df, pd.DataFrame([row])], ignore_index=True)
    df = df.set_index('time')

    fig, ax1 = plt.subplots(figsize=(15, 7))
    fig.suptitle('水泵-阀门系统仿真结果', fontsize=16)

    # 绘制两个水库的水位
    ax1.plot(df.index, df['source_res.water_level'], 'b-', label='源水库水位 (m)')
    ax1.plot(df.index, df['dest_res.water_level'], 'g-', label='目标水库水位 (m)')
    ax1.set_xlabel('时间 (s)')
    ax1.set_ylabel('水位 (m)', color='b')
    ax1.tick_params(axis='y', labelcolor='b')
    ax1.grid(True)

    # 绘制泵的流量
    ax2 = ax1.twinx()
    ax2.plot(df.index, df['pump1.outflow'], 'r--', label='水泵流量 (m³/s)')
    ax2.set_ylabel('流量 (m³/s)', color='r')
    ax2.tick_params(axis='y', labelcolor='r')

    lines, labels = ax1.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    ax2.legend(lines + lines2, labels + labels2, loc='upper right')

    plt.show()

plot_history(history)