數據組可以做一些增強,詳細原理可問GPT,以下為代碼示例

In [None]:
# 定義增強參數
noise_levels = [0.01, 0.05, 0.1]
scale_factors = [0.9, 1.1, 1.2]
sequence_lengths = [8, 10, 12]
scale_ranges = [(0.9, 1.1), (0.8, 1.2)]

# 增強後的數據列表
augmented_data_list = []

# 原始數據
augmented_data_list.append(data_array)

# 噪聲攪動
for noise_level in noise_levels:
    augmented_data_list.append(add_noise(data_array, noise_level=noise_level))

# 時間縮放
for scale_factor in scale_factors:
    augmented_data_list.append(time_scaling(data_array, scale_factor=scale_factor))

# 滑動窗口
for length in sequence_lengths:
    X_var, y_var = create_sequences(data_array, length)
    augmented_data_list.append(np.hstack([X_var.numpy().reshape(-1, X_var.shape[2]), y_var.numpy().reshape(-1, 1)]))

# 特徵擾動
for scale_range in scale_ranges:
    augmented_data_list.append(feature_scaling(data_array, scale_range=scale_range))

# 合併增強後的數據
augmented_data = np.vstack(augmented_data_list)


以下為改進代碼示例,加入數據增強/optuna/第2步驟模型/Early stop 初始超參請參考Sunny提供初始代碼

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import optuna
import matplotlib.pyplot as plt

# ====================
# 1. 數據準備與增強
# ====================
# 加載數據
data_0 = pd.read_csv('./test2.csv')
data_0['Cycle'] = data_0['Cycle'].str.extract('(\d+)').astype(int)

# 標準化數據
scaler = StandardScaler()
data_array = scaler.fit_transform(data_0[['Cycle', 'C1ch_Max_Charge', 'C1ch_Min_Charge', 'C1ch_Mean_Charge', 'C1ch_Charge_Diff']])

# 創建序列數據
def create_sequences(data, sequence_length):
    X, y = [], []
    for i in range(len(data) - sequence_length):
        X.append(data[i:i + sequence_length, :-1])
        y.append(data[i + sequence_length, -1])
    return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.float32).unsqueeze(1)

# 數據增強
def add_noise(data, noise_level=0.01):
    noise = np.random.normal(0, noise_level, data.shape)
    return data + noise

def feature_scaling(data, scale_range=(0.9, 1.1)):
    scale_factors = np.random.uniform(scale_range[0], scale_range[1], data.shape[1])
    return data * scale_factors

# 增強數據整合
augmented_data_list = [
    data_array,  # 原始數據
    add_noise(data_array, noise_level=0.05),  # 加噪聲
    feature_scaling(data_array, scale_range=(0.8, 1.2)),  # 特徵縮放
]
augmented_data = np.vstack(augmented_data_list)  # 合併增強後的數據

# 生成序列數據
sequence_length = 10
X_tensor, y_tensor = create_sequences(augmented_data, sequence_length)

# 切分訓練集與驗證集
X_train, X_val, y_train, y_val = train_test_split(X_tensor, y_tensor, test_size=0.2, random_state=42)

# ====================
# 2. LSTM 模型定義
# ====================
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.2)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, (hn, cn) = self.lstm(x)
        out = out[:, -1, :]
        out = self.fc(out)
        return out

# ====================
# 3. Early Stopping 機制
# ====================
class EarlyStopping:
    def __init__(self, patience=10, delta=0):
        self.patience = patience
        self.delta = delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None or val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

# ====================
# 4. 用 Optuna 調整 LSTM 模型參數
# ====================
def objective(trial):
    hidden_size = trial.suggest_int('hidden_size', 32, 128, step=32)
    num_layers = trial.suggest_int('num_layers', 1, 3)
    lr = trial.suggest_loguniform('lr', 1e-4, 1e-2)

    model = LSTMModel(input_size=X_tensor.shape[2], hidden_size=hidden_size, num_layers=num_layers)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()
    train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=32, shuffle=False)

    # 訓練過程
    for epoch in range(10):
        model.train()
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            predictions = model(X_batch)
            loss = criterion(predictions, y_batch)
            loss.backward()
            optimizer.step()

        # 驗證損失
        model.eval()
        with torch.no_grad():
            val_loss = 0
            for X_batch, y_batch in val_loader:
                val_predictions = model(X_batch)
                val_loss += criterion(val_predictions, y_batch).item()
    return val_loss / len(val_loader)

# 運行 Optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=20)
best_params = study.best_params
print("最佳參數:", best_params)

# ====================
# 5. 用最佳參數訓練 LSTM
# ====================
final_model = LSTMModel(input_size=X_tensor.shape[2], hidden_size=best_params['hidden_size'], num_layers=best_params['num_layers'])
optimizer = optim.Adam(final_model.parameters(), lr=best_params['lr'])
criterion = nn.MSELoss()
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=32, shuffle=False)

# 加入 Early Stopping
early_stopping = EarlyStopping(patience=10)
loss_history, val_loss_history = [], []
for epoch in range(50):
    final_model.train()
    epoch_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        predictions = final_model(X_batch)
        loss = criterion(predictions, y_batch)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    loss_history.append(epoch_loss / len(train_loader))

    # 驗證
    final_model.eval()
    with torch.no_grad():
        val_loss = 0
        for X_batch, y_batch in val_loader:
            val_predictions = final_model(X_batch)
            val_loss += criterion(val_predictions, y_batch).item()
        avg_val_loss = val_loss / len(val_loader)
        val_loss_history.append(avg_val_loss)

    print(f"Epoch {epoch+1}, Train Loss: {loss_history[-1]:.4f}, Val Loss: {avg_val_loss:.4f}")
    if early_stopping(avg_val_loss):
        print(f"Early stopping at epoch {epoch+1}")
        break

# ====================
# 6. 可視化
# ====================
# 損失曲線
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(loss_history) + 1), loss_history, label='Training Loss', marker='o')
plt.plot(range(1, len(val_loss_history) + 1), val_loss_history, label='Validation Loss', marker='x')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss Curve')
plt.legend()
plt.grid()
plt.show()

# 預測與實際值對比
final_model.eval()
with torch.no_grad():
    test_predictions = final_model(X_tensor).squeeze().numpy()
    test_actual = y_tensor.squeeze().numpy()

plt.figure(figsize=(10, 6))
plt.plot(test_actual, label='Actual')
plt.plot(test_predictions, label='Predicted', linestyle='--')
plt.xlabel('Samples')
plt.ylabel('Target')
plt.title('Actual vs Predicted')
plt.legend()
plt.grid()
plt.show()

# 殘差分布
residuals = test_actual - test_predictions
plt.figure(figsize=(10, 6))
plt.hist(residuals, bins=50, alpha=0.7)
plt.title('Residual Distribution')
plt.xlabel('Residual')
plt.ylabel('Frequency')
plt.grid()
plt.show()


以下為建議改進方向:第一種model處理後的數據可再做調整>>降維以及特徵擴展(可根據我們計算結果討論)

In [None]:
import optuna
from sklearn.decomposition import PCA

# 定義調參的目標函數
def objective(trial):
    # 1. 調參範圍
    n_components = trial.suggest_int('n_components', 2, 10)  # PCA 降維維度
    use_nonlinear = trial.suggest_categorical('use_nonlinear', [True, False])  # 是否使用平方特徵
    use_interaction = trial.suggest_categorical('use_interaction', [True, False])  # 是否使用交互特徵

    # 2. 對第一階段輸出進行降維
    pca = PCA(n_components=n_components)
    reduced_features = pca.fit_transform(first_stage_outputs)  # 降維後特徵

    # 3. 特徵擴展
    expanded_features = torch.tensor(reduced_features, dtype=torch.float32)
    if use_nonlinear:
        nonlinear_features = first_stage_outputs ** 2
        expanded_features = torch.cat([expanded_features, nonlinear_features], dim=1)
    if use_interaction:
        interaction_features = first_stage_outputs * X_tensor[:, -1, :]  # 與原始特徵的交互
        expanded_features = torch.cat([expanded_features, interaction_features], dim=1)

    # 4. 拼接原始特徵
    X_second_stage = torch.cat([X_tensor[:, -1, :], expanded_features], dim=1)
    X_second_stage = X_second_stage.unsqueeze(1)  # 增加時間步維度

    # 5. 第二階段模型 (GRU)
    second_model = GRUModel(input_size=X_second_stage.shape[2], hidden_size=64, num_layers=2)
    optimizer = optim.Adam(second_model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # 訓練與驗證數據加載
    train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=32, shuffle=False)

    # 訓練過程
    for epoch in range(10):  # 每次調參只訓練 10 個 epoch
        second_model.train()
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            predictions = second_model(X_batch)
            loss = criterion(predictions, y_batch)
            loss.backward()
            optimizer.step()

    # 驗證損失
    second_model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            val_predictions = second_model(X_batch)
            val_loss += criterion(val_predictions, y_batch).item()
    return val_loss / len(val_loader)

# 使用 Optuna 進行調參
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=20)

# 輸出最佳參數
best_params = study.best_params
print("最佳參數:", best_params)
