In [1]:
#The following file paths are all absolute paths. You can replace them with relative paths at runtime, and the files are located in their respective folders.
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import gym
import matplotlib.pyplot as plt
import random
import argparse
from collections import OrderedDict
from copy import copy
from tqdm import tqdm, trange
import time
import scipy
import scipy.linalg
from scipy.integrate import odeint
import sys
import os
os.chdir(r'/media/serena/study/Vscode_works/python_Vscode/mamba/Nonlinear_LSPN/')
import csv
sys.path.append("Predict_Model_Train/")
sys.path.append("utility_LSPN/")
from Utility import data_collecter
from LSPN_test import LSPN_Mamba
os.environ['KMP_DUPLICATE_LIB_OK'] = "TRUE"


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
Methods = ["KoopmanDerivative","KoopmanRBF",\
            "KNonlinear","KNonlinearRNN","KoopmanU",\
            "KoopmanNonlinearA","KoopmanNonlinear",\
            "KNonlinearmamba"]
Method_names = ["KoopmanDerivative","KoopmanRBF",\
            "KDNN","KRNN","DKUC(ours)",\
            "DKAC(ours)","DKN(ours)",\
            "KNonlinearmamba"]

In [3]:
def eval_err(suffix,env_name,method_index):
    method = Methods[method_index]
    root_path = "/media/serena/study/Vscode_works/python_Vscode/mamba/Nonlinear_LSPN/DATA/Mamba_data_raw/"+suffix
    print(method)
    if method.endswith("KNonlinear"):
        import Learn_Knonlinear as lka
    elif method.endswith("KNonlinearRNN"):
        import Learn_Knonlinear_RNN as lka
    elif method.endswith("KoopmanNonlinear"):
        import Learn_KoopmanNonlinear_with_KlinearEig as lka
    elif method.endswith("KoopmanNonlinearA"):
        import Learn_KoopmanNonlinearA_with_KlinearEig as lka
    elif method.endswith("KoopmanU"):
        import Learn_Koopman_with_KlinearEig as lka
    elif method.endswith("KNonlinearmamba"):
        import Learn_Knonlinear_mamba as lka   
    for file in os.listdir(root_path):
        if file.startswith(method+"_"+env_name) and file.endswith(".pth"):
            model_path = file  
    Data_collect = data_collecter(env_name)
    udim = Data_collect.udim
    Nstates = Data_collect.Nstates
    layer_width = 128
    dicts = torch.load(root_path+"/"+model_path,map_location=torch.device('cpu'))
    state_dict = dicts["model"]
    if method.endswith("KNonlinear"):
        Elayer = dicts["Elayer"]
        net = lka.Network(layers=Elayer,u_dim=udim)
    elif method.endswith("KNonlinearRNN"):
        net = lka.Network(input_size=udim+Nstates,output_size=Nstates,hidden_dim=layer_width, n_layers=3)
    elif method.endswith("KNonlinearmamba"):
        net = LSPN_Mamba(
        # This module uses roughly 3 * expand * d_model^2 parameters
        d_model=3, # Model dimension d_model
        d_state=8,  # SSM state expansion factor
        d_conv=4,    # Local convolution width
        expand=4,    # Block expansion factor
    ).to("cuda")
    elif method.endswith("KoopmanNonlinear") or method.endswith("KoopmanNonlinearA"):
        layer = dicts["layer"]
        blayer = dicts["blayer"]
        NKoopman = layer[-1]+Nstates
        net = lka.Network(layer,blayer,NKoopman,udim)
    elif method.endswith("KoopmanU"):
        layer = dicts["layer"]
        NKoopman = layer[-1]+Nstates
        net = lka.Network(layer,NKoopman,udim)   
    net.load_state_dict(state_dict)
    total_params = sum(p.numel() for p in net.parameters())
    print(f"Total parameters: {total_params}")
    # net.cuda()
    net.double()
    Samples = 50
    steps = 1000
    random.seed(2022)
    np.random.seed(2022)
    times = 4 
    max_loss_all = np.zeros((times,steps))
    mean_loss_all = np.zeros((times,steps))
    # Test_time = np.zeros(())
    with torch.no_grad():
        for i in trange(times, desc="predicting", unit="times"):
            test_data_path = "/media/serena/study/Vscode_works/python_Vscode/mamba/Nonlinear_LSPN/DATA/LSPN_predict_data/middle_data/{}{}{}.npy".format(i,suffix,env_name)
            # if os.path.exists(test_data_path):
            #     test_data = np.load("/media/serena/study/毕业设计/中期/Python/MPC_trykoopman/results/difcompare_data/{}{}.npy".format(env_name,i))
            # else:
            test_data = Data_collect.collect_koopman_data(Samples,steps)
            # np.save("/media/serena/study/Vscode_works/python_Vscode/mamba/Nonlinear_LSPN/DATA/LSPN_predict_data/middle_data/{}{}{}.npy".format(i,suffix,env_name),test_data)
            
            max_loss,mean_loss = lka.K_loss(test_data,net,udim)
            max_loss_all[i] = max_loss.reshape(-1)
            mean_loss_all[i] = mean_loss.reshape(-1)
    max_mean = np.mean(max_loss_all,axis=0)
    max_std = np.std(max_loss_all,axis=0)
    mean_mean =  np.mean(mean_loss_all,axis=0)
    mean_std =  np.std(mean_loss_all,axis=0)
    np.save("/media/serena/study/Vscode_works/python_Vscode/mamba/Nonlinear_LSPN/DATA/LSPN_predict_data/steps{}".format(steps)+suffix+env_name+"_"+method+".npy",np.array([max_mean,max_std,mean_mean,mean_std]))
    return max_mean,max_std,mean_mean,mean_std

In [4]:
suffix = "mamba_test6" 
# suffix = "mamba_test_50_5k_2w"       #6,8,9  
# suffix = "mamba_dampingpendulum"
env_name = "DampingPendulum"
# env_name = "MountainCarContinuous-v0"
# env_name = "Pendulum-v1"
for i in [7]:
    #i = 2
    eval_err(suffix,env_name,method_index=i)

KNonlinearmamba
Total parameters: 504


predicting: 100%|██████████| 4/4 [00:15<00:00,  3.87s/times]


In [None]:
colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf']
font = {'size'   : 12}

plt.rc('font', **font)
# markers = ['*','+','*','+','*','+','*']
#env_name = "DampingPendulum"
#env_name = "CartPole-v1"
# env_name = "MountainCarContinuous-v0"
# env_name = "Pendulum-v1"
# env_name = "Franka"
# env_name = "DoublePendulum"
title = env_name
# compare = "max"
compare = "mean"
#i = 2
#plt.plot(np.log10(data[0]),'*-',color = colors[i],label="Layer Depth of NNs = {}".format(i+1),linewidth=1.5)
for i in [7]:
    file_path = "/media/serena/study/Vscode_works/python_Vscode/mamba/Nonlinear_LSPN/DATA/LSPN_predict_data/steps{}".format(steps)+suffix+env_name+"_"+Methods[i]+".npy"
    data = np.load(file_path)
    plt.plot(np.log10(data[2]),'*-',color = colors[i],label=Methods[i],linewidth=1.5)
plt.legend()
# plt.grid(linestyle='-.')
plt.xlabel("Steps",fontsize=12)
plt.ylabel("Log Err",fontsize=12)
plt.title(title,fontsize=15)
# plt.savefig("D:/毕业设计/论文/pictures/sizeNN/"+env_name+"_TEST_"+compare+"_new1.png",dpi=400)

In [None]:
def draw_LSPN_only_eval_err(suffix,env_name,method_index):
    method = Methods[method_index]
    root_path = "/media/serena/study/Vscode_works/python_Vscode/mamba/Nonlinear_LSPN/DATA/Mamba_data_raw/"+suffix
    print(method)
    if method.endswith("KNonlinear"):
        import Learn_Knonlinear as lka
    elif method.endswith("KNonlinearRNN"):
        import Learn_Knonlinear_RNN as lka
    elif method.endswith("KoopmanNonlinear"):
        import Learn_KoopmanNonlinear_with_KlinearEig as lka
    elif method.endswith("KoopmanNonlinearA"):
        import Learn_KoopmanNonlinearA_with_KlinearEig as lka
    elif method.endswith("KoopmanU"):
        import Learn_Koopman_with_KlinearEig as lka
    elif method.endswith("KNonlinearmamba"):
        import Learn_Knonlinear_mamba as lka   
    for file in os.listdir(root_path):
        if file.startswith(method+"_"+env_name) and file.endswith(".pth"):
            model_path = file  
    Data_collect = data_collecter(env_name)
    udim = Data_collect.udim
    Nstates = Data_collect.Nstates
    layer_width = 128
    dicts = torch.load(root_path+"/"+model_path,map_location=torch.device('cpu'))
    state_dict = dicts["model"]
    if method.endswith("KNonlinear"):
        Elayer = dicts["Elayer"]
        net = lka.Network(layers=Elayer,u_dim=udim)
    elif method.endswith("KNonlinearRNN"):
        net = lka.Network(input_size=udim+Nstates,output_size=Nstates,hidden_dim=layer_width, n_layers=3)
    elif method.endswith("KNonlinearmamba"):
        net = LSPN_Mamba(
        # This module uses roughly 3 * expand * d_model^2 parameters
        d_model=3, # Model dimension d_model
        d_state=8,  # SSM state expansion factor
        d_conv=4,    # Local convolution width
        expand=4,    # Block expansion factor
    ).to("cuda")
    elif method.endswith("KoopmanNonlinear") or method.endswith("KoopmanNonlinearA"):
        layer = dicts["layer"]
        blayer = dicts["blayer"]
        NKoopman = layer[-1]+Nstates
        net = lka.Network(layer,blayer,NKoopman,udim)
    elif method.endswith("KoopmanU"):
        layer = dicts["layer"]
        NKoopman = layer[-1]+Nstates
        net = lka.Network(layer,NKoopman,udim)   
    net.load_state_dict(state_dict)
    total_params = sum(p.numel() for p in net.parameters())
    print(f"Total parameters: {total_params}")
    # net.cuda()
    net.double()
    Samples = 50
    steps = 50
    random.seed(2022)
    np.random.seed(2022)
    times = 1
    max_loss_all = np.zeros((Samples,steps))
    mean_loss_all = np.zeros((Samples,steps))
    with torch.no_grad():
        test_data_all = Data_collect.collect_koopman_data(Samples,steps)
        for i in trange(Samples, desc="predicting", unit="Samples"):
            test_data = test_data_all[:, i:i+1, :]
            np.save("/media/serena/study/Vscode_works/python_Vscode/mamba/Nonlinear_LSPN/DATA/LSPN_predict_data/middle_data/draw_LSPN_only{}{}{}.npy".format(i,suffix,env_name),test_data)
            max_loss,mean_loss = lka.K_loss(test_data,net,udim)
            max_loss,mean_loss = np.log10(max_loss), np.log10(mean_loss)
            max_loss_all[i] = max_loss.reshape(-1)
            mean_loss_all[i] = mean_loss.reshape(-1)
    print(max_loss_all.shape)
    print(mean_loss_all.shape)
    np.save("/media/serena/study/Vscode_works/python_Vscode/mamba/Nonlinear_LSPN/DATA/LSPN_predict_data/draw_LSPN_only{}".format(Samples)+suffix+env_name+"_"+method+".npy",np.array(mean_loss_all))
    return mean_loss_all

In [None]:
suffix = "mamba_test_50_5k_2w"       #6,8,9  
# suffix = "mamba_dampingpendulum"
# env_name = "DampingPendulum"
env_name = "MountainCarContinuous-v0"
# env_name = "Pendulum-v1"
for i in [7]:
    #i = 2
    draw_LSPN_only_eval_err(suffix,env_name,method_index=i)
