In [1]:
#poetry add torch torchvision torchaudio (or pip install torch torchvision torchaudio)

import torch
import torch.nn as nn
import numpy as np
import pandas as pd

In [None]:
 #建構LSTM模型(使用PyTorch)
 
 
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # 初始化隱藏狀態和細胞狀態
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # LSTM 傳播
        out, _ = self.lstm(x, (h0, c0))

        # 取最後一個時間步的輸出
        out = self.fc(out[:, -1, :])
        return out

# 模型參數
input_size = 2  # 輸入特徵數量：車速和車流量
hidden_size = 33  # 隱藏層大小（可調整的超參數）################## 1
num_layers = 3  # LSTM 層數（可調整的超參數）################### 2
output_size = 1  # 輸出特徵數量：預測車速

In [41]:
#訓練集名稱
file_name = "m05a_05F0287N_05F0055N_trainingDataset"
#訓練集所在路徑
file_path = r"D:\緯育課程\專題\TJR102_project\Data_M05A\m05a_05F0287N_05F0055N_trainingDataset.csv" #請替換成自己清理後訓練集(2024)資料的位置

#將處理好的資料匯入
def load_data(file_path):
    df = pd.read_csv(file_path)
    # 確保資料是按照時間排序的
    # 如果您的資料沒有時間戳記或已確保排序，可以省略這一步
    # df = df.sort_values(by='時間戳記') # 假設有時間戳記列

    # 將特徵轉換為 PyTorch 張量
    # 使用 'Avg_speed' 和 'Total_volume' 為特徵
    # 並且Avg_speed是我們要預測的目標變數
    features = df[['Avg_speed', 'Total_volume']].values # 輸入特徵
    target = df['Avg_speed'].values # 預測目標

    # 將 numpy 陣列轉換為 PyTorch 張量
    features = torch.tensor(features, dtype=torch.float32)
    target = torch.tensor(target, dtype=torch.float32)
    return features, target

features, target = load_data(file_path)

# 時間序列資料通常需要將資料組織成序列 (sequence) 的形式
def create_sequences(features, target, sequence_length):
    xs, ys = [], []
    for i in range(len(features) - sequence_length):
        x = features[i:(i + sequence_length)]
        y = target[i + sequence_length]
        xs.append(x)
        ys.append(y)
    return torch.stack(xs), torch.stack(ys)

# 使用過去 40 個時間步的資料來預測，例如，使用過去 N 個時間步的資料來預測下一個時間步; 就是學長說的windows(多少個輸入值預測一個輸出值)
sequence_length = 40 #時間步長度（可調整的超參數）################## 3
X, y = create_sequences(features, target, sequence_length)

# 將訓練資料分割成訓練集和驗證集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]


In [None]:
# 處理訓練集和驗證集的資料for訓練和評估


from torch.utils.data import DataLoader, TensorDataset

# 將資料打包成 TensorDataset
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

batch_size = 64 # 可調整的超參數 #################### 4
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 初始化模型、損失函數和優化器
model = LSTMModel(input_size, hidden_size, num_layers, output_size)
criterion = nn.MSELoss() # 常用於回歸任務  # 可調整的超參數############ 5  但這邊就固定使用MSELoss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 優化器  可調整的超參數############ 6   # 學習率 可調整的超參數############ 7

epochs = 65 # 可調整的超參數######################## 8

# 訓練模型
for epoch in range(epochs):
    model.train() # 設置模型為訓練模式
    for batch_X, batch_y in train_loader:
        # 前向傳播
        outputs = model(batch_X)
        loss = criterion(outputs.squeeze(), batch_y) # outputs.squeeze() 確保形狀匹配

        # 反向傳播和優化
        optimizer.zero_grad() # 清除之前的梯度
        loss.backward() # 計算梯度
        optimizer.step() # 更新權重

    print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# 在測試集上評估模型
model.eval() # 設置模型為評估模式
test_predictions = []
test_actuals = []
with torch.no_grad(): # 在評估時禁用梯度計算
    for batch_X, batch_y in test_loader:
        outputs = model(batch_X)
        test_predictions.extend(outputs.squeeze().tolist())
        test_actuals.extend(batch_y.tolist())

# 將預測結果轉換為 numpy 陣列以便計算 MAPE
test_predictions = np.array(test_predictions)
test_actuals = np.array(test_actuals)

Epoch [1/65], Loss: 3321.9951
Epoch [2/65], Loss: 2133.8003
Epoch [3/65], Loss: 1350.9557
Epoch [4/65], Loss: 820.7831
Epoch [5/65], Loss: 574.0102
Epoch [6/65], Loss: 380.4660
Epoch [7/65], Loss: 238.9210
Epoch [8/65], Loss: 255.1575
Epoch [9/65], Loss: 201.1940
Epoch [10/65], Loss: 225.7889
Epoch [11/65], Loss: 225.8472
Epoch [12/65], Loss: 250.7917
Epoch [13/65], Loss: 81.0407
Epoch [14/65], Loss: 14.4840
Epoch [15/65], Loss: 8.9532
Epoch [16/65], Loss: 10.4659
Epoch [17/65], Loss: 5.0741
Epoch [18/65], Loss: 7.0192
Epoch [19/65], Loss: 3.9650
Epoch [20/65], Loss: 3.9339
Epoch [21/65], Loss: 3.7816
Epoch [22/65], Loss: 7.7098
Epoch [23/65], Loss: 2.3503
Epoch [24/65], Loss: 3.2127
Epoch [25/65], Loss: 7.9650
Epoch [26/65], Loss: 5.5338
Epoch [27/65], Loss: 3.8213
Epoch [28/65], Loss: 4.3955
Epoch [29/65], Loss: 3.2846
Epoch [30/65], Loss: 3.4099
Epoch [31/65], Loss: 5.4585
Epoch [32/65], Loss: 2.6513
Epoch [33/65], Loss: 3.5236
Epoch [34/65], Loss: 2.6098
Epoch [35/65], Loss: 7.3066

In [43]:
# 計算MAPE


def mean_absolute_percentage_error(y_true, y_pred):
    """
    計算平均絕對百分比誤差 (MAPE)
    y_true: 實際值 (numpy array)
    y_pred: 預測值 (numpy array)
    """
    # 避免除以零的情況，可以設定一個非常小的值作為分母的下限
    # 或者濾掉 y_true 為零的點，因為對於車速來說，通常不會是零
    # 根據實際情況選擇處理方式
    
    # 這裡我們濾掉 y_true 為零的點，因為 MAPE 在實際值為零時無意義
    non_zero_indices = y_true != 0
    y_true_filtered = y_true[non_zero_indices]
    y_pred_filtered = y_pred[non_zero_indices]
    
    if len(y_true_filtered) == 0:
        return float('inf') # 如果沒有非零的實際值，MAPE 無法計算

    return np.mean(np.abs((y_true_filtered - y_pred_filtered) / y_true_filtered)) * 100

# 計算測試集上的 MAPE
mape = mean_absolute_percentage_error(test_actuals, test_predictions)
print(f'測試集 MAPE: {mape:.2f}%')

測試集 MAPE: 2.30%


In [44]:
# 保存訓練好的模型


# 定義模型保存的路徑和檔名
model_save_path = 'best_lstm_model.pth'

# 保存模型的狀態字典
torch.save(model.state_dict(), model_save_path)
print(f"模型已保存至: {model_save_path}")

模型已保存至: best_lstm_model.pth
