# Load Data

In [None]:
%load_ext autoreload
%autoreload 2
# https://pykoopman.readthedocs.io/en/master/tutorial_koopman_edmdc_for_vdp_system.html

In [None]:
from robot_tools.trajer import TrajTools, recorder, TrajsPainter, TrajInfo

path = "trajs_recorder_0211_17.json"  # trajs_recorder_0211_17
def load_data(path):
    data = recorder.json_process(path)
    (obs_trajs, raw_state_info) = TrajTools.construct_trajs(
        data, "observation", series_type="h", mixed_type="h", show_info=True
    )
    (act_trajs, raw_act_info) = TrajTools.construct_trajs(
        data, "action", series_type="h", mixed_type="h", show_info=True
    )
    return obs_trajs, act_trajs, raw_state_info, raw_act_info

obs_trajs, act_trajs, raw_state_info, raw_act_info = load_data(path)

# ? 为什么突然要指明类型了？
raw_state_info:TrajInfo
raw_act_info:TrajInfo

# Training data
RAW_STATE_MIXED = obs_trajs[2][:2,:]
RAW_ACTION_MIXED = act_trajs[2]
RAW_STATE_SERIES = obs_trajs[1][:2,:]
RAW_ACTION_SERIES = act_trajs[1]
assert RAW_STATE_MIXED.shape == RAW_ACTION_MIXED.shape
assert RAW_ACTION_SERIES.shape == RAW_STATE_SERIES.shape
# 强制修改为2维
raw_state_info.points_dim = raw_act_info.points_dim = 2

In [None]:
# draw 2D raw data
show_points_num = 30
raw_state_painter = TrajsPainter(RAW_STATE_MIXED, raw_state_info)
raw_state_painter.figure_size_2D = (3, 3)
raw_state_painter.features_axis_labels = ["x", "y", "yaw"]
raw_state_painter.plot_2D_features((0, show_points_num), (0,), (0, 1), title="state trajs")
raw_act_painter = TrajsPainter(RAW_ACTION_MIXED, raw_act_info)
raw_act_painter.figure_size_2D = (3, 3)
raw_act_painter.features_axis_labels = ["x", "y", "yaw"]
raw_act_painter.plot_2D_features((0, show_points_num), (0,), (0, 1), title="action trajs")

print(RAW_STATE_SERIES[:, 0:4])
print(RAW_ACTION_SERIES[:, 0:4])
# print(RAW_STATE_MIXED[:, 0:4])
# print(RAW_ACTION_MIXED[:, 0:4])

In [None]:
def get_XYU(start_point, points_num, trajs):
    # get sub series for training
    end_point = int(start_point + points_num)
    assert end_point <= raw_state_info.each_points_num[0]
    assert raw_state_info.points_dim == raw_act_info.points_dim == 2
    X, X_info = TrajTools.get_sub_series_trajs(
        RAW_STATE_SERIES, raw_state_info, (start_point, end_point), trajs
    )
    Y, Y_info = TrajTools.get_sub_series_trajs(
        RAW_STATE_SERIES,
        raw_state_info,
        (start_point + 1, end_point + 1),
        trajs,
    )
    U, U_info = TrajTools.get_sub_series_trajs(
        RAW_ACTION_SERIES,
        raw_act_info,
        (start_point, end_point),
        trajs,
    )
    return X, Y, U, X_info, Y_info, U_info

def draw_XYU(X, Y, U, X_info, Y_info, U_info, points_num, stage):
    u_painter = TrajsPainter(U, U_info)
    u_painter.features_sharetitle = f"Inputs Trajectories for {stage}ing"
    u_painter.features_self_labels = " U"
    u_painter.features_lines = "--r"
    u_painter.features_axis_labels = ["x", "y"]
    u_painter.plot_features_with_t((0, points_num), (0,), (0, 1))

    x_painter = TrajsPainter(X, X_info)
    x_painter.features_sharetitle = f"T States Trajectories for {stage}ing"
    x_painter.features_self_labels = "X"
    x_painter.features_lines = "-b"
    x_painter.features_axis_labels = ["x", "y"]
    x_painter.plot_features_with_t((0, points_num), (0,), (0, 1))

    y_painter = TrajsPainter(Y, Y_info)
    y_painter.features_sharetitle = f"T+1 States Trajectories for {stage}ing"
    y_painter.features_self_labels = "Y"
    y_painter.features_lines = "-g"
    y_painter.features_axis_labels = ["x", "y"]
    y_painter.plot_features_with_t((0, points_num), (0,), (0, 1))

# get sub series for training
points_num_for_train = 400
start_point_for_train = 0
trajs = (0,)

(
    X_for_train,
    Y_for_train,
    U_for_train,
    X_info_for_train,
    Y_info_for_train,
    U_info_for_train,
) = get_XYU(start_point_for_train, points_num_for_train, trajs)
draw_XYU(
    X_for_train,
    Y_for_train,
    U_for_train,
    X_info_for_train,
    Y_info_for_train,
    U_info_for_train,
    points_num_for_train,
    "Train",
)

# get sub series for testing
start_point_for_test = 400
points_num_for_test = 98
end_point = int(start_point_for_test + points_num_for_test)
trajs = (0,)

(
    X_for_test,
    Y_for_test,
    U_for_test,
    X_info_for_test,
    Y_info_for_test,
    U_info_for_test,
) = get_XYU(start_point_for_test, points_num_for_test, trajs)

draw_XYU(
    X_for_test,
    Y_for_test,
    U_for_test,
    X_info_for_test,
    Y_info_for_test,
    U_info_for_test,
    points_num_for_test,
    "Test",
)

# Koopman using EDMDc

## Training

In [None]:
import pykoopman as pk
import warnings
import numpy as np
np.random.seed(42)  # for reproducibility
warnings.filterwarnings("ignore")
from pykoopman.regression import EDMDc
from pykoopman.observables import RadialBasisFunction, Polynomial

def simulate_koopman(model, X, U, points_num):
    # 这里因为只有一个轨迹，所以采用了简化的方式
    Xtrue = X
    x0 = X[:, 0]

    print(x0.shape)
    print(U.shape)

    points_num = Xtrue.shape[1]
    Xkoop = model.simulate(x0, U.T[:points_num,:], n_steps=points_num - 1)
    Xkoop = np.vstack([x0, Xkoop])  # add initial point
    Xkoop = Xkoop.T
    return Xtrue, Xkoop

def draw_simulate_result(Xtrue, Xkoop, start_point, points_num, stage, method):
    end_point = int(start_point + points_num)
    Xpredic_info = Xtrain_info = TrajInfo.consruct(
        Xtrue,
        "series_h",
        1,
    )
    trajs_painter = TrajsPainter()
    trajs_painter.update_trajs(Xtrue, Xtrain_info)
    trajs_painter.features_self_labels = "True"
    trajs_painter.features_lines = "--r"
    trajs_painter.features_sharetitle = f"Regration on {stage}ing data"
    trajs_painter.features_axis_labels = ["x", "y", "yaw"]
    axis = trajs_painter.plot_features_with_t(
        (start_point, end_point), (0,), (0, 1), return_axis=True
    )

    trajs_painter.update_trajs(Xkoop, Xpredic_info)
    trajs_painter.features_self_labels = method
    trajs_painter.features_lines = "-b"
    trajs_painter.plot_features_with_t((0, end_point), (0,), (0, 1), given_axis=axis)

def get_mse_error(Xtrue, Xkoop):
    """获得均方误差"""
    assert Xtrue.shape == Xkoop.shape, f"Xtrue.shape:{Xtrue.shape}, Xkoop.shape:{Xkoop.shape}"
    err = np.linalg.norm(Xtrue - Xkoop) / np.sqrt(Xkoop.size)
    return err

In [None]:
EDMDc_ = EDMDc()

"""径向基方法"""
# 收敛拟合
# rbf_type = "thinplate"  # 'thinplate' polyharmonic gauss invquad invmultquad
# regressor = RadialBasisFunction(
#     rbf_type=rbf_type,
#     n_centers=10,
#     centers=None,
#     kernel_width=1.0,
#     polyharmonic_coeff=2.0,
#     include_state=True,
# )
# 发散拟合
# EDMDc_ = EDMDc()
# centers = np.random.uniform(-1,1,(2,5))
# regressor = RadialBasisFunction(
#     rbf_type="thinplate",
#     n_centers=centers.shape[1],
#     centers=centers,
#     kernel_width=1,
#     polyharmonic_coeff=1,
#     include_state=True,
# )
# test
# rbf_type = "polyharmonic"  # 'thinplate' polyharmonic gauss invquad invmultquad
# regressor = RadialBasisFunction(
#     rbf_type=rbf_type,
#     n_centers=10,
#     centers=None,
#     kernel_width=1.0,
#     polyharmonic_coeff=2.0,
#     include_state=True,
# )
"""多项式方法"""
# 良好拟合
regressor = Polynomial(degree=2)
# 错误拟合
# regressor = Polynomial(degree=2, include_bias=True, interaction_only=True)

model = pk.Koopman(observables=regressor, regressor=EDMDc_)
model.fit(X_for_train.T, y=Y_for_train.T, u=U_for_train.T)

In [None]:
# 打印模型信息
print(model.A), print("")
print(model.B), print("")
print(model.C), print("")
print(model.W), print("")

In [None]:
# 在训练集上回归
Xtrue_train, Xkoop_train = simulate_koopman(model, X_for_train, U_for_train, points_num_for_train)
# 绘图
start_point_for_train = 0
max_train_draw = 400
draw_simulate_result(Xtrue_train, Xkoop_train, start_point_for_train, max_train_draw, "train", "EDMDc")
get_mse_error(Xtrue_train, Xkoop_train)

## Testing

In [None]:
# 在测试集上回归
Xtrue_test, Xkoop_test = simulate_koopman(model, X_for_test, U_for_test, points_num_for_test)
# 绘图
start_test_draw = 0
max_test_draw = 98
draw_simulate_result(Xtrue_test, Xkoop_test, start_test_draw, max_test_draw, "test", "EDMDc")
get_mse_error(Xtrue_test, Xkoop_test)

# 最小二乘拟合对比

In [None]:
import numpy as np
from robot_tools.datar import least_squares
X_L_for_train = np.vstack([X_for_train, U_for_train])
print(X_L_for_train[:, 0:4])
print(X_L_for_train.shape)
print(Y_for_train.shape)
K = least_squares(X_L_for_train.T, Y_for_train.T)
print(K)

def simulate_least_squares(X, U, K, n_steps):
    x0 = X[:, 0]
    X_predict = np.zeros((x0.shape[0], n_steps))
    x = x0
    for i in range(n_steps):
        x = np.dot(K, np.concatenate([x, U[:, i]]))
        X_predict[:, i] = x
    X_predict = np.vstack([x0, X_predict.T])  # add initial point
    return X, X_predict.T

## Training

In [None]:
Xtrue_train, Xleast_train = simulate_least_squares(
    X_for_train, U_for_train, K, n_steps=points_num_for_train - 1
)
# 绘图
start_train_draw = 0
max_train_draw = 400
draw_simulate_result(
    Xtrue_train,
    Xleast_train,
    start_train_draw,
    max_train_draw,
    "train",
    "LeastSquares",
)
get_mse_error(Xtrue_train, Xleast_train)

## Testing

In [None]:
Xtrue_test, Xleast_test = simulate_least_squares(
    X_for_test, U_for_test, K, n_steps=points_num_for_test - 1
)
# 绘图
draw_simulate_result(
    Xtrue_test, Xleast_test, start_test_draw, max_test_draw, "test", "Least Squares"
)
get_mse_error(Xtrue_test, Xleast_test)

# 2D轨迹绘制

## Training

In [None]:
show_points_num = 15
raw_state_painter = TrajsPainter()
raw_state_painter.update_trajs(Xtrue_train, X_info_for_train)
raw_state_painter.trajs_labels = "Training trajectories"
axis = raw_state_painter.plot_2D_features((0, show_points_num), (0,), (0, 1), return_axis=True)
raw_state_painter.update_trajs(Xkoop_train, X_info_for_train)
raw_state_painter.trajs_labels = "Koopman prediction on training trajectories"
raw_state_painter.trajs_lines = "->r"
raw_state_painter.trajs_markersize = 3
raw_state_painter.plot_2D_features((0, show_points_num), (0,), (0, 1), given_axis=axis)

## Testing

In [None]:
raw_state_painter = TrajsPainter()
raw_state_painter.update_trajs(Xtrue_test, X_info_for_test)
raw_state_painter.trajs_labels = "Testing trajectories"
axis = raw_state_painter.plot_2D_features((0, show_points_num), (0,), (0, 1), return_axis=True)
raw_state_painter.update_trajs(Xkoop_test, X_info_for_test)
raw_state_painter.trajs_labels = "Koopman prediction on testing trajectories"
raw_state_painter.trajs_lines = "->r"
raw_state_painter.trajs_markersize = 3
raw_state_painter.plot_2D_features((0, show_points_num), (0,), (0, 1), given_axis=axis)

# 离线RL训练