In [1]:
import wshrRelabelLight as WRL
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler

# 指定文件夹路径
# download_save_path = 'E:/Dataset/wind_shear/Data_Download'
exception_save_path = '../Dataset/Exception_Data'

# 获取文件夹下的所有文件名称
# download_folder_names = [item for item in os.listdir(download_save_path) if os.path.isdir(os.path.join(download_save_path, item))]
exception_folder_names = [item for item in os.listdir(exception_save_path) if os.path.isdir(os.path.join(exception_save_path, item))]
instruction_folder_names = ["@Instructions"]

# 生成所有文件夹路径
# download_folder_paths = [os.path.join(download_save_path, item) for item in download_folder_names]
exception_folder_paths = [os.path.join(exception_save_path, item) for item in exception_folder_names]

# 指定读取变量名称
'''
CTSO仿真器输出变量：TIME, ALT, HDOT, VT, ALPHA, GAMMA, PITCH, GREF, WXDT, WZ, VDOT, ALRT
没有WXDT和VDOT的原始数据，GREF不知道什么意思
'''
variable_list = ['ALT', 'ALTR', "TAS", 'GS', 'AOA1', 'AOA2', 'PTCH', 'WS', "WD", 'SAT', 'TAT', 'PI', 'PT']

# 构建训练集和测试集
train_folder_path = exception_folder_paths[3]
train_mat_name = os.listdir(train_folder_path)[2]
train_X, train_Y = WRL.dataConstruct(train_folder_path, train_mat_name, variable_list, normalized=False)

test_folder_path = exception_folder_paths[1]
test_mat_name = os.listdir(test_folder_path)[0]
test_X, test_Y = WRL.dataConstruct(test_folder_path, test_mat_name, variable_list, normalized=False)

train_wshr_warn_idx_list = np.where(train_Y[:, 0] == 0)[0]
test_wshr_warn_idx_list = np.where(test_Y[:, 0] == 0)[0]

# 计算飞行轨迹角
variable_list.append('GAMMA')
train_Gamma_X = train_X[:, 6] - np.mean(train_X[:, 4:5])
test_Gamma_X = test_X[:, 6] - np.mean(test_X[:, 4:5])

# 重组训练集和测试集数据
train_X = np.hstack((train_X, train_Gamma_X.reshape(-1, 1)))
test_X = np.hstack((test_X, test_Gamma_X.reshape(-1, 1)))

# # 截取海拔位于50~2000英尺的起飞阶段数据
# train_X = train_X[np.where((train_X[:int(train_X.shape[0]/2), 0] >= 50) & (train_X[:int(train_X.shape[0]/2), 0] <= 2000))[0]]
# test_X = test_X[np.where((test_X[:int(test_X.shape[0]/2), 0] >= 50) & (test_X[:int(test_X.shape[0]/2), 0] <= 2000))[0]]
# train_X = train_X[:5000, :]
print(train_X.shape, test_X.shape)

# 数据集归一化
s_scaler = StandardScaler()
train_X = s_scaler.fit_transform(train_X)
train_mean, train_std = s_scaler.mean_, s_scaler.scale_
test_X_origin = test_X
test_X = s_scaler.transform(test_X)


# 进行时间序列平稳性检验
from statsmodels.stats.diagnostic import acorr_ljungbox

# for i in range(train_X.shape[1]):
#     # 打印LB统计量
#     print(acorr_ljungbox(train_X[:, i], lags=15))
#     '''
#     p-value = 0 << 0.05，显著自相关性
#     '''


# 构建向量自回归模型
import pandas as pd
from statsmodels.tsa.api import VAR
import plotly.graph_objects as go

# 拟合VAR模型
decay_steps = 5
model = VAR(train_X)
results = model.fit(decay_steps)

# 打印模型的系数
VAR_params = results.params
print(VAR_params.shape)
# print(var_params)

# 打印模型预测结果
pred_steps = 15
test_X_pred = None
for start in range(0, test_X.shape[0] - pred_steps - decay_steps + 1):
    pred_results = results.forecast(test_X[start:start+decay_steps], steps=pred_steps)
    if start == 0:
        test_X_pred = np.vstack((test_X[:decay_steps], pred_results))
        # print(test_X_pred.shape)
    else:
        test_X_pred = np.vstack((test_X_pred, pred_results[-1, :]))

# 以训练集的均值和方差还原为原始量纲
test_X_pred = test_X_pred * train_std + train_mean

# 交互式可视化
for idx in range(len(variable_list)):
    fig = go.Figure()
    fig.add_trace(go.Scatter(y=test_X_origin[:, idx], mode='lines', name = 'GT', line = dict(color='blue')))
    fig.add_trace(go.Scatter(y=test_X_pred[:, idx], mode='lines', name = 'pred', line = dict(color='red')))
    fig.update_layout(
        title=f'VAR Prediction vs. Ground Truth for {variable_list[idx]}',
        xaxis_title='Time Step',
        yaxis_title='Value'
    )
    fig.show()

# 存储模型
results.save('../result/variable_timeSeries/forecast/VAR_model_652200204251941.pkl')

(7852, 14) (10460, 14)
(71, 14)
