# 读取大型 log 文件
 - 使用 pandas 的 read_csv 函数配合 chunksize 参数来分块读取大文件，避免一次性将整个文件加载到内存中
 - 设置 sep='\t' 来正确处理制表符分隔的数据
 - 使用 float_precision='high' 来保持高精度数值，因为数据中包含科学计数法表示的小数
 - 包含了错误处理机制，可以处理文件不存在等常见错误
 - 提供了基本的数据信息展示，包括数据维度、前几行数据预览和基本统计信息

In [None]:
import pandas as pd
import os

def read_large_log_file(file_path):
    # 使用chunksize参数来分块读取大文件
    # 设置sep='\t'因为数据是用制表符分隔的
    # 使用float_precision='high'来保持高精度数值
    chunks = pd.read_csv(
        file_path,
        sep='\t',
        float_precision='high',
        chunksize=10000  # 每次读取10000行
    )
    
    # 初始化一个空的DataFrame来存储结果
    df = pd.DataFrame()
    
    # 分块处理数据
    for chunk in chunks:
        # 这里可以对每个chunk进行处理
        # 例如：只保留某些列，或者进行一些计算
        df = pd.concat([df, chunk], ignore_index=True)
        
    return df

# 读取 SIMPACK 自动生成的实时仿真文件
file_path = os.path.join('..', 'SPCK_Model', 'Vehicle4WDB_RealtimeCRV.output', 'Vehicle4WDB_RealtimeCRV.y.log')

# 读取自编 C++ 代码与 SIMPACK 实时联合仿真的结果文件
# file_path = '/home/yaoyao/Documents/myProjects/ROS2WithSPCK/SimResult.log'

try:
    # 读取数据
    df = read_large_log_file(file_path)
    
    # 显示基本信息
    print("SIMPACK Realtime Log 文件基本信息：")
    print(f"行数: {len(df)}")
    print(f"列数: {len(df.columns)}")

    # # 显示数据基本统计信息
    # print("\n数据统计信息：")
    # print(df.describe())

except FileNotFoundError:
    print(f"错误：找不到文件 {file_path}")
except Exception as e:
    print(f"发生错误：{str(e)}")

In [None]:
%matplotlib inline

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import font_manager

# 设置中文字体
plt.rcParams['font.family'] = ['Noto Sans CJK JP']

# 创建图形
fig, ax = plt.subplots(figsize=(12, 6))

# 绘制主曲线
ax.plot(df['Time'], df['$y_w01_rotw'], 'b-', linewidth=1, label='Wheelset 01 Speed')

# 设置图形标题和标签
ax.set_title('Wheel Speed [rad/s]', fontsize=14, pad=15)
ax.set_xlabel('Time [s]', fontsize=12)
ax.set_ylabel('Wheelset 01 Speed [rad/s]', fontsize=12)

# 添加网格
ax.grid(True, linestyle='--', alpha=0.7)

# 添加图例
ax.legend(loc='upper right')

# 设置坐标轴刻度
ax.tick_params(axis='both', which='major', labelsize=10)

# 添加一些基本统计信息
stats_text = f'Max: {df["$y_w01_rotw"].max():.2e}m\n' \
                f'Min: {df["$y_w01_rotw"].min():.2e}m\n' \
                f'Avg: {df["$y_w01_rotw"].mean():.2e}m'
ax.text(0.02, 0.98, stats_text,
        transform=ax.transAxes,
        bbox=dict(facecolor='white', alpha=0.8),
        verticalalignment='top',
        fontsize=10)
#ax.set_ylim(-0.8, 1.0)
#ax.set_xlim(0, 50)

# 优化布局
plt.tight_layout()

# 显示图形
plt.show()

# PID 控制器的实现
 **PIDController 类**
 - kp, ki, kd, n 分别对应 Simulink 中的 P,I,D,N
 - update() 方法内部先做积分，再对差分值进行指数平滑，以实现带滤波的微分环节，然后组合为并行PID输出
 - 物理模型：质量-弹簧-阻尼器系统
 - 示例中直接使用了 kp=2500, ki=4, kd=10, n=5000，与在 Simulink 中调好的参数相一致。


In [None]:
import matplotlib.pyplot as plt

class PIDController:
    """
    并行型 PID 控制器 (带一阶滤波微分)
    形式: Gc(s) = P + I/s + D * [N s / (s + N)]
    """
    def __init__(self, kp, ki, kd, n):
        """
        :param kp: 比例增益 P
        :param ki: 积分增益 I
        :param kd: 微分增益 D
        :param n:  微分滤波系数 N
        """
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.n = n
        
        self.integral = 0.0
        self.prev_error = 0.0
        self.derivative_term = 0.0

    def reset(self):
        """重置控制器内部状态"""
        self.integral = 0.0
        self.prev_error = 0.0
        self.derivative_term = 0.0

    def update(self, error, dt):
        """
        根据当前误差和时间步长 dt 计算控制器输出
        
        :param error: 当前误差
        :param dt:    本次采样周期 (时间步长)
        :return:      PID 输出
        """
        # 累加积分
        self.integral += error * dt
        
        # (未滤波的)微分
        derivative_raw = (error - self.prev_error) / dt
        
        # 一阶滤波 (指数平滑)
        # alpha = n * dt / (1 + n * dt)
        alpha = self.n * dt / (1.0 + self.n * dt)
        self.derivative_term = (1.0 - alpha)*self.derivative_term + alpha * derivative_raw
        
        # PID输出 (并行形式)
        output = (
            self.kp * error +
            self.ki * self.integral +
            self.kd * self.derivative_term
        )
        
        # 更新上一次的误差
        self.prev_error = error
        return output

def main():
    """
    使用示例:
    这里模拟一个简化的质量-弹簧-阻尼器系统:
      m * d2x/dt2 + c * dx/dt + k * x = u(t)
    其中 u(t) 由 PID 控制器提供 (相当于施加在质量块上的外力)，
    目标是让 x(t) 跟踪某个给定设定值 setpoint。
    
    用欧拉法离散更新:
      a = (u - c*v - k*x) / m
      v_{k+1} = v_k + a * dt
      x_{k+1} = x_k + v_k * dt
    """
    
    # -----------------------
    # 1) 创建PID控制器
    #    参考在Simulink里调好的参数:
    kp = 2500
    ki = 4
    kd = 10
    n  = 5000
    pid = PIDController(kp, ki, kd, n)
    
    # -----------------------
    # 2) 定义质量-弹簧-阻尼器系统的参数
    m = 1.0   # 质量
    c = 2.0   # 阻尼系数
    k = 10.0  # 弹簧刚度

    # -----------------------
    # 3) 仿真参数
    dt = 0.001         # 时间步长
    sim_time = 1.0     # 总仿真时间，秒
    steps = int(sim_time / dt)
    
    # -----------------------
    # 4) 初始状态
    setpoint = 1.0     # 目标位置 (米)
    x = 0.0            # 当前位置
    v = 0.0            # 速度
    pid.reset()        # 重置控制器
    
    # -----------------------
    # 5) 用于记录数据，便于可视化
    t_list = []
    x_list = []
    setpoint_list = []
    
    # -----------------------
    # 6) 开始仿真循环
    time = 0.0
    for i in range(steps):
        # 计算误差
        error = setpoint - x
        
        # 调用PID
        control_output = pid.update(error, dt)
        
        # 质量-弹簧-阻尼器系统的运动方程离散化(欧拉法)
        # a = (F - c*v - k*x) / m
        a = (control_output - c*v - k*x) / m
        v = v + a * dt
        x = x + v * dt
        
        # 记录数据
        t_list.append(time)
        x_list.append(x)
        setpoint_list.append(setpoint)
        
        # 增加仿真时间
        time += dt

    # -----------------------
    # 7) 画图
    plt.figure(figsize=(8,4))
    plt.plot(t_list, x_list, label="x(t) - System Output", color='b')
    plt.plot(t_list, setpoint_list, '--', label="Setpoint", color='r')
    plt.title("Mass-Spring-Damper with PID Control")
    plt.xlabel("Time [s]")
    plt.ylabel("Position [m]")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    main()


# 控制目标量时间序列 $E_Vehicle_RefVelX_LeftWheels
**左车轮通过曲线**

## 复现 SIMPACK Constant Second Derivatives

Excitation for creating an excitation signal in time domain. The signal undergoes up to four different transitions from one to another value. Moreover, this sequence may be continued or repeated. This Excitation generates a sequence of up to four smooth transitions of variable length and between different signal values. After the sequence, the signal may stay on the last value reached, or the sequence may be repeated or continued on another level.
Each transition consists of two half parabolas. Thus the first derivative is assembled from two linear sections and the second derivative from two constant levels.

In [None]:
import numpy as np

class TwoHalfParabolas:
    """
    在时刻 [t_start, t_start+T] 内，用"二段半抛物线"的方式，
    使得 f(t) 从 f_begin 变到 f_end，并保证段首、段末的一阶导数=0。
    二阶导数在前半段为 +a，后半段为 -a；a由 (f_end - f_begin)/T^2 自动计算。
    """
    def __init__(self, t_start, t_end, f_begin, f_end):
        self.t_start = t_start
        self.t_end   = t_end
        self.f_begin = f_begin
        self.f_end   = f_end
        self.T = t_end - t_start

        df = (f_end - f_begin)
        # 若 f_begin==f_end，则 a=0；否则按公式 a = 4Δf / T^2
        if abs(self.T) < 1e-12 or abs(df) < 1e-12:
            # 对于 T=0 或 Δf=0 的情况，这段其实是常值
            self.a = 0
        else:
            self.a = 4.0 * df / (self.T**2)

    def get_value(self, t):
        if t < self.t_start or t > self.t_end:
            return None
        tau = t - self.t_start
        if self.a == 0:
            # 没变化或者 T=0
            return self.f_begin
        
        # 前半段: [0, T/2]
        if tau <= self.T/2:
            return self.f_begin + 0.5*self.a*(tau**2)
        else:
            # 后半段: [T/2, T]
            # 先算中点处 f_mid & v_mid
            tm = self.T/2
            f_mid = self.f_begin + 0.5*self.a*(tm**2)
            v_mid = self.a * tm
            tau_local = tau - tm
            # 后半段加速度 = -a，速度 = v_mid - a*tau_local
            # 位置相对 f_mid = ∫(v_mid - a*tau_local) dτ
            return f_mid + v_mid*tau_local - 0.5*self.a*(tau_local**2)

    def get_derivative(self, t, order=1):
        if t < self.t_start or t > self.t_end:
            return None
        if self.a == 0:
            # 常值段 => 导数=0
            return 0.0 if order>=1 else None
        
        tau = t - self.t_start
        if order == 1:
            # 前半段速度 = a*tau
            # 后半段速度 = v_mid - a*(tau - T/2)
            if tau <= self.T/2:
                return self.a * tau
            else:
                tm = self.T/2
                v_mid = self.a*tm
                tau_local = tau - tm
                return v_mid - self.a*tau_local
        elif order == 2:
            # 前半段加速度 = +a
            # 后半段加速度 = -a
            return self.a if (tau <= self.T/2) else -self.a
        else:
            # 第三阶及以上 = 0
            return 0.0


class PiecewiseConstantSecondDeriv:
    """
    将多段 TwoHalfParabolas 串起来。每段在 times[i]~times[i+1] 之间，
    使 f(t) 从 values[i] 过渡到 values[i+1]，且段首末一阶导数=0。
    """
    def __init__(self, times, values):
        self.segments = []
        n = len(times)
        if len(values) != n:
            raise ValueError("times 与 values 长度需一致")

        for i in range(n - 1):
            seg = TwoHalfParabolas(
                t_start = times[i],
                t_end   = times[i+1],
                f_begin = values[i],
                f_end   = values[i+1]
            )
            self.segments.append(seg)

        self.t_end_total = times[-1]
        self.f_end_total = values[-1]

    def get_value(self, t):
        """
        返回在时刻 t 的 f(t) 值。
        若 t >= self.t_end_total，则默认为持续保持最后值不变。
        """
        for seg in self.segments:
            val = seg.get_value(t)
            if val is not None:
                return val
        # 若超出最后一段，则返回最后值
        if t >= self.t_end_total:
            return self.f_end_total
        return None

    def get_derivative(self, t, order=1):
        """
        返回在时刻 t 的第 order 阶导数。
        若 t >= self.t_end_total，则导数=0（因为保持常值）。
        """
        for seg in self.segments:
            val = seg.get_derivative(t, order=order)
            if val is not None:
                return val
        if t >= self.t_end_total:
            return 0.0
        return None


## 拼接多次过渡的完整示例: 控制目标量时间序列 $E_Vehicle_RefVelX_LeftWheels
 - 演示使用 PiecewiseConstantSecondDeriv（基于“两段半抛物线”插值）
 - 实现渐变时序曲线(4: Constant Second Derivatives)：在 0.3s 保持常值，然后在 3~6s 内从该常值平滑过渡到一个新值，再在 6s 之后保持该新值不变
 - 包含绘图代码，方便观察结果

In [None]:
import numpy as np

v_begin1 = 60/3.6  # 约为 16.6667
v_end2   = (60/3.6)/300 * (300 + 1.5/2)  # 约为 16.708333

# 时间点
t_array = [0, 3, 6]
# 对应在这些时间点的目标值
f_array = [v_begin1, v_begin1, v_end2]  
# 解释:
#  - [0,3] 段: 从 16.6667 -> 16.6667 (a=0, 恒值)
#  - [3,6] 段: 从 16.6667 -> 16.7083 (半抛物线过渡)
# 超过 6s 后自动保持 16.7083

# 构建分段
pw_traj = PiecewiseConstantSecondDeriv(t_array, f_array)

# 取样并画图
import matplotlib.pyplot as plt

def plot_piecewise_trajectory(traj, t_min=0, t_max=10, n_points=500):
    t_samples = np.linspace(t_min, t_max, n_points)
    f_vals = [traj.get_value(t) for t in t_samples]
    v_vals = [traj.get_derivative(t, order=1) for t in t_samples]
    a_vals = [traj.get_derivative(t, order=2) for t in t_samples]

    fig, axs = plt.subplots(3, 1, figsize=(8,8), sharex=True)
    axs[0].plot(t_samples, f_vals, 'b-', label='f(t)')
    axs[0].set_ylabel('f(t)')
    axs[0].grid(True, linestyle='--', alpha=0.5)
    axs[0].legend()

    axs[1].plot(t_samples, v_vals, 'g-', label="f'(t)")
    axs[1].set_ylabel("f'(t)")
    axs[1].grid(True, linestyle='--', alpha=0.5)
    axs[1].legend()

    axs[2].plot(t_samples, a_vals, 'r-', label="f''(t)")
    axs[2].set_xlabel('Time (s)')
    axs[2].set_ylabel("f''(t)")
    axs[2].grid(True, linestyle='--', alpha=0.5)
    axs[2].legend()

    for ax in axs:
        # 画出关键时间点竖线
        ax.axvline(x=3, color='gray', linestyle='--', alpha=0.7)
        ax.axvline(x=6, color='gray', linestyle='--', alpha=0.7)

    plt.tight_layout()
    plt.show()
    ax.grid(True, linestyle='--', alpha=0.7)

# 画图
plot_piecewise_trajectory(pw_traj, t_min=0, t_max=50)



# 开环计算 PID 控制参数
- 先计算误差
- 再计算控制器输出

In [None]:
%matplotlib widget
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import font_manager
plt.rcParams['font.family'] = ['Noto Sans CJK JP']   # 设置中文字体

# PID 控制器的输入值, 等价于 车轮转速换算到纵向运行速度 与 目标纵向速度的差值

# 1) 构建/获取 pw_traj 对象 (分段目标值)
#    （假设已经定义过 PiecewiseConstantSecondDeriv/TwoHalfParabolas）
from_piecewise_times = [0, 3, 6]
from_piecewise_vals  = [16.6667, 16.6667, 16.7083]
pw_traj = PiecewiseConstantSecondDeriv(from_piecewise_times, from_piecewise_vals)

# 2) 对 df["Time"] 逐点计算目标值
df["C_CurveLeft_val"] = [pw_traj.get_value(t) for t in df["Time"]]

# 3) 计算 dX_WL01
# 原公式:  dX_WL01 = C_CurveLeft - ( - df["$Y_WL01"] * (860/2000) )
#           = C_CurveLeft + df["$Y_WL01"] * (860/2000)
df["dX_WL01"] = df["C_CurveLeft_val"] + df["$y_w01_rotw"] * (860/2000)
dX_WL01 = df["dX_WL01"].values
dX_ts = df['Time'].values

# 作图: PID 控制器输入
plt.figure(figsize=(12, 6))
# 绘制主曲线
plt.plot(dX_ts, dX_WL01, 'b-', linewidth=1)

# 设置图形标题和标签
plt.title('Input to PID Controller', fontsize=14, pad=15)
plt.xlabel('Time [s]', fontsize=12)
plt.ylabel('Speed Diff in X [m/s]', fontsize=12)
plt.grid(True)
ax.set_ylim(-0.10, 0.25)
ax.set_ylim(0, 50)
plt.show()

## 单向开环 PID 控制器反馈, 与 SIMPACK 结果对比
**open-loop replay test**

- 已经在 Simulink + SIMPACK 环境里跑过闭环仿真，并得到了一系列随时间变化的“输入误差”序列 (这里是 dX_WL01)
- 用“自定义 PID 控制器”逐点接收这一误差，按对应的时间步长计算出 PID 输出，但并不把输出再反馈到被控系统
- 得到一个“PID 输出”时序，可与 Simulink PID 的输出结果做对比，从而评估二者是否一致。

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

class PIDController:
    """
    并行型 PID 控制器 (带一阶滤波微分)
    形式: Gc(s) = P + I/s + D * [N s / (s + N)]
    """
    def __init__(self, kp, ki, kd, n):
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.n = n
        self.integral = 0.0
        self.prev_error = 0.0
        self.derivative_term = 0.0

    def reset(self):
        self.integral = 0.0
        self.prev_error = 0.0
        self.derivative_term = 0.0

    def update(self, error, dt):
        # 积分
        self.integral += error * dt
        
        # 微分 (带一阶滤波)
        if dt > 1e-12:
            derivative_raw = (error - self.prev_error) / dt
        else:
            derivative_raw = 0.0
        
        alpha = self.n * dt / (1.0 + self.n * dt)
        self.derivative_term = (1.0 - alpha)*self.derivative_term + alpha*derivative_raw
        
        # PID 输出
        output = (
            self.kp * error +
            self.ki * self.integral +
            self.kd * self.derivative_term
        )
        
        # 记录误差以备下次计算
        self.prev_error = error
        return output


def open_loop_pid_test(df, pid_params):
    """
    根据 df 里的误差信号 (dX_WL01) 和时间 (Time),
    对自定义PID进行单向开环测试.

    :param df: 必须包含 "Time" 和 "dX_WL01" 两列
    :param pid_params: 一个字典, 包含 {kp, ki, kd, n}
    :return: 一个新的 DataFrame, 包含 PID_Output 列
    """
    
    # 1) 提取时间与误差
    t_vals = df['Time'].values
    err_vals = df['dX_WL01'].values
    
    # 2) 创建并初始化 PID
    pid = PIDController(
        kp=pid_params['kp'],
        ki=pid_params['ki'],
        kd=pid_params['kd'],
        n=pid_params['n']
    )
    pid.reset()
    
    # 3) 准备存放PID输出的数组
    N = len(t_vals)
    pid_outputs = np.zeros(N)
    
    # 4) 逐点调用 PID
    for i in range(N):
        if i == 0:
            dt = 0.0
        else:
            dt = t_vals[i] - t_vals[i-1]
            # 如果遇到非递增时间, 可以做异常处理
            if dt < 0.0:
                raise ValueError("Time sequence is not strictly increasing!")
        
        pid_outputs[i] = pid.update(err_vals[i], dt)
    
    # 5) 将结果附加到 df
    df_result = df.copy()
    df_result['PID_Output'] = pid_outputs
    return df_result

def main():
    # ------------------------------------------------
    # 已经得到以下数组:
    # dX_ts : 时间序列
    # dX_WL01 : 与该时间对齐的误差信号
    # 在此用一些假数据作为示例:
    # （实际使用时, 直接用df["Time"], df["dX_WL01"]）
    # dX_ts = np.linspace(0, 5, 501)         # 0~5秒, 步长0.01
    # # 假设 dX_WL01 由某个外部仿真或实车采样得到
    # dX_WL01 = 0.1 * np.sin(2 * np.pi * 1.0 * dX_ts)  # 仅示例用途
    
    # ------------------------------------------------
    # 将数据放入 DataFrame
    df_test = pd.DataFrame({
        'Time': dX_ts,
        'dX_WL01': dX_WL01
    })

    # 定义与Simulink相同的PID参数:
    pid_params = {
        'kp': 20000, #2500,
        'ki': 4,
        'kd': 0,
        'n': 5000
    }
    
    # ------------------------------------------------
    # 调用单向PID测试函数
    df_result = open_loop_pid_test(df_test, pid_params)
    
    # ------------------------------------------------
    # 绘制对比图: 输入 vs PID输出
    fig, ax = plt.subplots(2, 1, figsize=(10,6), sharex=True)
    
    # 第一幅图: 误差输入
    ax[0].plot(df_result['Time'], df_result['dX_WL01'], 'b-', label='Error (dX_WL01)')
    ax[0].set_ylabel('Error [m/s]')
    ax[0].grid(True)
    ax[0].legend()
    
    # 第二幅图: PID 输出
    ax[1].plot(df_result['Time'], df_result['PID_Output'], 'r-', label='PID Output - Torque')
    ax[1].set_xlabel('Time [s]')
    ax[1].set_ylabel('PID Output Torque [N·m]')
    ax[1].grid(True)
    ax[1].legend()
    
    plt.suptitle('Open-Loop PID Test with Simu dX_WL01 Data')
    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    main()
