In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from keras.layers import Input, Dense, LSTM, Conv1D, Dropout, Bidirectional, Multiply, Flatten, MaxPooling1D
from keras.models import Model
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import backend as K
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestRegressor
import seaborn as sns
from tensorflow.keras.layers import Concatenate
from keras.layers import Lambda

In [None]:
# 设置全局参数
TIME_STEPS = 4
MODEL_PATH = './model.h5'  

# 函数：创造时间序列数据集
def create_dataset_with_lookback_and_skip(dataset, look_back=TIME_STEPS):
    dataX, dataY = [], []
    for i in range(0, len(dataset) - look_back + 1, look_back):
        a = dataset[i:(i + look_back), 1:]
        dataX.append(a)
        dataY.append(dataset[i + look_back - 1, 0])
    return np.array(dataX), np.array(dataY)

# 注意力机制
def attention_3d_block(inputs):
    input_dim = int(K.int_shape(inputs)[2])
    a = Dense(input_dim, activation='softmax')(inputs) 
    a_probs = Multiply()([inputs, a])
    return a_probs

# 创建带有注意力机制的BiLSTM模型
def build_model(lstm_units, dropout, input_dims):
    inputs = Input(shape=(TIME_STEPS, input_dims))
    x = Conv1D(filters=64, kernel_size=4, strides=1, activation='relu', padding='same')(inputs)
    x = MaxPooling1D(pool_size=1)(x)
    x = Dropout(dropout)(x)
    lstm_out = Bidirectional(LSTM(lstm_units, return_sequences=True))(x)
    lstm_out = Dropout(dropout)(lstm_out)
    attention_mul = attention_3d_block(lstm_out)
    attention_mul = Flatten()(attention_mul)
    output = Dense(1, activation='linear')(attention_mul)
    model = Model(inputs=inputs, outputs=output)
    model.compile(loss='mse', optimizer='adam', metrics=['mae'])
    return model

# 贝叶斯超参数优化目标函数
def train_and_evaluate_model(lstm_units, dropout, batch_size, epochs):
    lstm_units = int(round(lstm_units))  # 确保 LSTM 单元数为整数
    batch_size = int(round(batch_size / 2) * 2)  # 批量大小调整为偶数
    epochs = int(round(epochs))  # 确保 epoch 数为整数
    dropout = round(dropout, 1)  # 保留一位小数
    K.clear_session()  # 清除当前会话，避免内存泄漏

    # 构建模型
    model = build_model(
        lstm_units=lstm_units,
        dropout=dropout,
        input_dims=train_X.shape[2]  # 仅使用动态特征的输入维度
    )

    # 定义回调函数
    early_stopping = EarlyStopping(monitor='val_loss', patience=8, mode='min')
    model_checkpoint = ModelCheckpoint(MODEL_PATH.format(datetime.now().strftime('%Y%m%d_%H%M%S')),
                                        save_best_only=True, monitor='val_loss', mode='min')

    # 模型训练
    history = model.fit(
        train_X,  # 使用训练集的动态特征
        train_Y,
        epochs=epochs,
        batch_size=batch_size,
        validation_split=0.1,
        callbacks=[early_stopping, model_checkpoint],
        verbose=0
    )
    val_loss = min(history.history['val_loss'])  # 获取验证集的最小损失值
    return -val_loss  # 贝叶斯优化需要最大化目标值，因此取负

# 模型训练函数
def train_model(train_X, train_Y):
    model = build_model(INPUT_DIMS)
    model.compile(loss='mse', optimizer='adam', metrics=['mae'])
    
    # 回调函数
    early_stopping = EarlyStopping(monitor='val_loss', patience=40, mode='min')
    model_checkpoint = ModelCheckpoint(MODEL_PATH.format(datetime.now().strftime('%Y%m%d_%H%M%S')),
                                        save_best_only=True, monitor='val_loss', mode='min')
    
    # 模型训练
    history = model.fit(
        train_X, train_Y,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        validation_split=0.1,
        callbacks=[early_stopping, model_checkpoint]
    )
    return model, history

# 评估模型并计算 R² 和 RMSE
def evaluate_model(model, X, Y, dataset_type='Test'):
    results = model.predict(X).flatten()  # 确保输出为 1D
    r2 = r2_score(Y, results)
    rmse = np.sqrt(mean_squared_error(Y, results))
    print(f'{dataset_type} R^2: {r2:.4f}, RMSE: {rmse:.4f}')
    return results, r2, rmse

# 数据归一化和反归一化
def normalize_data(data):
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(data)
    return scaled_data, scaler

def inverse_normalize(data, scaler):
    return scaler.inverse_transform(data)

# 绘制训练曲线
def plot_history(history):
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    if 'mae' in history.history:
        plt.subplot(1, 2, 2)
        plt.plot(history.history['mae'], label='Train MAE')
        plt.plot(history.history['val_mae'], label='Validation MAE')
        plt.title('Model MAE')
        plt.xlabel('Epoch')
        plt.ylabel('MAE')
        plt.legend()
    plt.tight_layout()
    plt.show()
    
# 反归一化预测值和实际值
def inverse_target_only(data, scaler):
    # 仅反归一化目标列，假设目标列是第一个
    dummy_data = np.zeros((len(data), scaler.scale_.shape[0]))
    dummy_data[:, 0] = data.flatten()
    return scaler.inverse_transform(dummy_data)[:, 0]
def inverse_and_export_results(predictions, actuals, scaler, output_path):
    # 反归一化
    predictions_inverse = inverse_target_only(predictions, scaler)
    actuals_inverse = inverse_target_only(actuals, scaler)
    
    # 创建 DataFrame 保存结果
    results_df = pd.DataFrame({
        'Predicted': predictions_inverse,
        'Actual': actuals_inverse
    })
    
    # 导出到 CSV 文件
    results_df.to_csv(output_path, index=False)
    print(f"Results exported to {output_path}")

In [None]:
# 取本地的训练集和测试集数据
train_data = pd.read_excel(r"F:\Data\Study\DataSet\Project\1.Linan\M_data\Prediction\La_15-18_30m_Train.xlsx")
test_data = pd.read_excel(r"F:\Data\Study\DataSet\Project\1.Linan\M_data\Prediction\La_15-18_30m_Test.xlsx")
train_df = train_data.drop(['FID','YSSZ','TIME'], axis=1)
test_df = test_data.drop(['FID','YSSZ','TIME'], axis=1)

# 使用随机森林计算特征重要性
target_column = 'AGB'  # 假设目标变量为 'AGB'
X_train = train_df.drop(columns=[target_column])
y_train = train_df[target_column]

X_test = test_df.drop(columns=[target_column])
y_test = test_df[target_column]

rf = RandomForestRegressor(random_state=42, n_estimators=100)
rf.fit(X_train, y_train)

# 提取特征重要性
feature_importances = pd.DataFrame({
    'Feature': X_train.columns,
    'Importance': rf.feature_importances_
}).sort_values(by='Importance', ascending=False)

# 打印特征重要性
print("Feature Importance from Random Forest:")
print(feature_importances)

# 绘制特征重要性柱状图
plt.figure(figsize=(9, 7))
sns.barplot(x='Importance', y='Feature', data=feature_importances)
plt.title('Feature Importances from Random Forest')
plt.show()

In [None]:
# 构建特征重要性表格，并取前10个特征
feature_importances = pd.DataFrame({
    'Feature': X_train.columns,
    'Importance': rf.feature_importances_
}).sort_values(by='Importance', ascending=False)

# === 4. 取前 10 个特征 ===
top_features = feature_importances.head(10)

print("Top 10 Important Features:")
print(top_features)

# === 5. 绘图展示 ===
plt.figure(figsize=(3, 2.5), dpi=200)
ax = sns.barplot(x='Feature', y='Importance', data=top_features, color='teal')

# 设置标题与坐标轴
plt.title('Annual Module Feature Importance (Linan)', fontsize=7.5, fontweight='light')
plt.xlabel('', fontsize=5.5)
plt.ylabel('', fontsize=5.5)
ax.tick_params(axis='x', labelsize=5.5, rotation=45)  # 旋转特征名，防止重叠
ax.tick_params(axis='y', labelsize=5.5)

# 网格线 + 布局
ax.grid(True, linestyle='', alpha=0.7)
plt.tight_layout()
plt.show()

In [None]:
# 设置特征重要性阈值，选择重要性高于阈值的特征
top_k_features = 10  # 选择前n个特征
selected_features = feature_importances['Feature'].iloc[:top_k_features].tolist()
print("Selected features based on importance threshold:", selected_features)

# 根据随机森林选择的特征构建数据集
if selected_features:
    train_df = train_df[[target_column] + selected_features]
    test_df = test_df[[target_column] + selected_features]
else:
    raise ValueError("No significant features selected.")

# 归一化训练集和测试集
train_scaled, scaler = normalize_data(train_df.values)
test_scaled = scaler.transform(test_df.values)  # 使用训练集的scaler对测试集进行归一化

# 创建时间序列数据集
train_X, train_Y = create_dataset_with_lookback_and_skip(train_scaled, TIME_STEPS)
test_X, test_Y = create_dataset_with_lookback_and_skip(test_scaled, TIME_STEPS)

# 打印数据集形状
print(f'Train X shape: {train_X.shape}, Train Y shape: {train_Y.shape}')
print(f'Test X shape: {test_X.shape}, Test Y shape: {test_Y.shape}')

# 设置输入维度
INPUT_DIMS = len(selected_features)

In [None]:
# 定义参数搜索空间，并为 batch_size 设置合理步长
pbounds = {
    'lstm_units': (32, 96),
    'dropout': (0.0, 0.0),
    'batch_size': (32, 64),
    'epochs': (50, 100)
}

# 定义开关变量
use_bayesian_optimization = False  # 如果为 True，则使用贝叶斯优化；为 False 则使用自定义参数

if use_bayesian_optimization:
    # 执行贝叶斯优化
    optimizer = BayesianOptimization(
        f=train_and_evaluate_model,
        pbounds=pbounds,
        random_state=42
    )

    optimizer.maximize(init_points=4, n_iter=14)

    # 获取最佳参数
    best_params = optimizer.max['params']
    best_params['batch_size'] = int(round(best_params['batch_size'] / 2) * 2)
    best_params['epochs'] = int(round(best_params['epochs']))
    best_params['lstm_units'] = int(round(best_params['lstm_units']))
    best_params['dropout'] = round(best_params['dropout'], 1)

    print("Best Parameters from Bayesian Optimization:", best_params)
else:
    # 使用自定义参数
    best_params = {
        'lstm_units': 64,
        'dropout': 0.0,
        'batch_size': 64,
        'epochs': 100
    }
    print("Using custom parameters:", best_params)

In [None]:
# 构建模型
final_model = build_model(
    lstm_units=int(best_params['lstm_units']),
    dropout=round(best_params['dropout'], 1),
    input_dims=INPUT_DIMS
)

# 模型训练
final_history = final_model.fit(
    train_X, train_Y,
    epochs=int(best_params['epochs']),
    batch_size=int(best_params['batch_size']),
    validation_split=0.1,
    callbacks=[EarlyStopping(monitor='val_loss', patience=40, mode='min')],
    verbose=1
)

# 训练集和测试集评估
train_results, train_r2, train_rmse = evaluate_model(final_model, train_X, train_Y, dataset_type='Train')
test_results, test_r2, test_rmse = evaluate_model(final_model, test_X, test_Y, dataset_type='Test')

# 调用 plot_history 显示训练过程中的损失曲线
plot_history(final_history)

In [None]:
# 模型预测
train_predictions = final_model.predict([train_X])
test_predictions = final_model.predict([test_X])

# 反归一化训练集的预测结果和实际值
train_predictions_inverse = inverse_target_only(train_predictions, scaler)
train_actuals_inverse = inverse_target_only(train_Y, scaler)

# 反归一化测试集的预测结果和实际值
test_predictions_inverse = inverse_target_only(test_predictions, scaler)
test_actuals_inverse = inverse_target_only(test_Y, scaler)

# 计算反归一化后的 RMSE 和 MAE
train_rmse_inverse = np.sqrt(mean_squared_error(train_actuals_inverse, train_predictions_inverse))
train_mae_inverse = mean_absolute_error(train_actuals_inverse, train_predictions_inverse)
test_rmse_inverse = np.sqrt(mean_squared_error(test_actuals_inverse, test_predictions_inverse))
test_mae_inverse = mean_absolute_error(test_actuals_inverse, test_predictions_inverse)

# 打印反归一化后的性能指标
print(f'Train RMSE (Inverse Scaled): {train_rmse_inverse:.4f}')
print(f'Train MAE (Inverse Scaled): {train_mae_inverse:.4f}')
print(f'Test RMSE (Inverse Scaled): {test_rmse_inverse:.4f}')
print(f'Test MAE (Inverse Scaled): {test_mae_inverse:.4f}')

# 绘制测试集预测与真实值对比（反归一化后）
plt.figure(figsize=(7, 5))
plt.plot(test_predictions_inverse, label='Predicted - Test', color='blue')
plt.plot(test_actuals_inverse, label='Actual - Test', color='orange')
plt.title('Prediction vs Actual (Test Set) - Inverse Scaled')
plt.xlabel('Step')
plt.ylabel('Value')
plt.legend()
plt.show()

# 绘制测试集散点图（反归一化后）
plt.figure(figsize=(6, 5))
plt.scatter(test_predictions_inverse, test_actuals_inverse, label='Data Points', color='green', alpha=0.6)
sns.regplot(x=test_predictions_inverse, y=test_actuals_inverse, scatter=False, label='Fit Line', color='red')
plt.title('Fitted Curve: Predicted vs Actual (Test Set) - Inverse Scaled')
plt.xlabel('Predicted Value (Inverse Scaled)')
plt.ylabel('Actual Value (Inverse Scaled)')
plt.legend()
plt.show()