In [None]:
!pip install xgboost

In [None]:
import pandas as pd
import numpy as np
from geopy.distance import geodesic
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from xgboost import XGBRegressor
import matplotlib.pyplot as plt

# 設定圖表風格
plt.rcParams['font.family'] = 'Microsoft JhengHei'
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['axes.titlesize'] = 14
plt.rcParams['axes.labelsize'] = 12
plt.rcParams['xtick.labelsize'] = 10
plt.rcParams['ytick.labelsize'] = 10
plt.rcParams['figure.figsize'] = (8, 6)
plt.rcParams['figure.dpi'] = 100

# 1. 讀取資料與特徵工程
df = pd.read_csv(r"C:\檔名.csv")
df['e_time'] = pd.to_datetime(df['e_time'])
df['Hour'] = df['e_time'].dt.hour
df['Minute'] = df['e_time'].dt.minute
df['Weekday'] = df['e_time'].dt.weekday

# 計算距離
target_lat, target_lon = 25.026622, 121.53006
df['Distance'] = df.apply(
    lambda row: geodesic((row['PositionLat'], row['PositionLon']), (target_lat, target_lon)).meters
    if pd.notnull(row['PositionLat']) and pd.notnull(row['PositionLon']) else np.nan,
    axis=1
)

# 篩選資料與編碼車牌
df_filtered = df[df['Distance'] <= 20000].copy()
df_filtered['PlateNumb_encoded'] = df_filtered['PlateNumb'].astype('category').cat.codes

# 特徵欄位定義
feature_cols_A = ['PlateNumb_encoded', 'Speed', 'Distance', 'rt_delay_sec', 'Hour', 'Minute', 'Weekday', 
                  'peak', 'daytype', 'rain', 'temp', 'wind', 'EstimateTime']
feature_cols_B = ['PlateNumb_encoded', 'Speed', 'Distance', 'rt_delay_sec', 'Hour', 'Minute', 'Weekday', 
                  'peak', 'daytype', 'rain', 'temp', 'wind']

# 2. 訓練與評估模型（模型 A / B）
all_runs = {}

for name, cols in [('模型 A', feature_cols_A), ('模型 B', feature_cols_B)]:
    df_model = df_filtered.dropna(subset=cols + ['true_arrival_sec'])
    df_model = df_model[(df_model['true_arrival_sec'] > 0) & (df_model['true_arrival_sec'] < 5000)]

    X = df_model[cols]
    y = df_model['true_arrival_sec']

    maes, rmses, r2s, models = [], [], [], []
    all_preds = []

    for i in range(20):
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, shuffle=True, random_state=i
        )

        model = XGBRegressor(n_estimators=100, max_depth=6, learning_rate=0.1, random_state=i)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        maes.append(mean_absolute_error(y_test, y_pred))
        rmses.append(np.sqrt(mean_squared_error(y_test, y_pred)))
        r2s.append(r2_score(y_test, y_pred))
        models.append(model)
        all_preds.append((y_test, y_pred))

    avg_mae, avg_rmse, avg_r2 = np.mean(maes), np.mean(rmses), np.mean(r2s)
    distances = [(mae - avg_mae)**2 + (rmse - avg_rmse)**2 + (r2 - avg_r2)**2
                 for mae, rmse, r2 in zip(maes, rmses, r2s)]
    best_index = int(np.argmin(distances))
    best_model = models[best_index]
    y_test_best, y_pred_best = all_preds[best_index]

    all_runs[name] = {
        '平均 MAE': round(avg_mae, 2),
        'MAE 標準差': round(np.std(maes), 2),
        '平均 RMSE': round(avg_rmse, 2),
        'RMSE 標準差': round(np.std(rmses), 2),
        '平均 R²': round(avg_r2, 4),
        'R² 標準差': round(np.std(r2s), 4),
        '最佳模型 index': best_index,
        '最佳 MAE': round(maes[best_index], 2),
        '最佳 RMSE': round(rmses[best_index], 2),
        '最佳 R²': round(r2s[best_index], 4),
        '最佳模型物件': best_model,
        '最佳預測對照': (y_test_best, y_pred_best)
    }

# 3. 印出結果
for name, res in all_runs.items():
    print(f"\n📊 {name}")
    for k, v in res.items():
        if k not in ['最佳模型物件', '最佳預測對照']:
            print(f"{k}: {v}")

# 4. 預測 vs 真實值 圖（模型 A / B）— 符合簡報風格（背景淺棕）
for name in ['模型 A', '模型 B']:
    y_test_best, y_pred_best = all_runs[name]['最佳預測對照']
    
    fig, ax = plt.subplots(figsize=(6.5, 5))
    fig.patch.set_facecolor('#f5f0e6')  # 整個圖背景
    ax.set_facecolor('#f5f0e6')         # 圖區域背景

    ax.scatter(y_test_best, y_pred_best, alpha=0.7, color='#c1a47e', edgecolor='none')
    ax.plot([y_test_best.min(), y_test_best.max()],
            [y_test_best.min(), y_test_best.max()], 'k--', linewidth=1.2)

    ax.set_xlabel("實際到站時間 (秒)", color='black')
    ax.set_ylabel("預測到站時間 (秒)", color='black')
    ax.set_title(f"{name} - 預測與實際比較", pad=12, color='black')

    ax.grid(True, linestyle='--', linewidth=0.5, alpha=0.4)
    ax.tick_params(colors='black')
    plt.tight_layout()
    
    filename = f"C:/Users/{name.strip()}_scatter.png"
    plt.savefig(filename, dpi=300)
    plt.show()

# 5. 特徵重要性（只印出數字，不畫圖）
for name in ['模型 A', '模型 B']:
    model = all_runs[name]['最佳模型物件']
    cols = feature_cols_A if name == '模型 A' else feature_cols_B
    importance = model.feature_importances_
    importance_series = pd.Series(importance, index=cols).sort_values(ascending=False)

    print(f"\n🔍 {name} 特徵重要性（由高到低）:")
    for i, (feature, score) in enumerate(importance_series.items(), 1):
        print(f"{i}. {feature}：{round(score, 4)}")

In [None]:
import pandas as pd
import numpy as np
from geopy.distance import geodesic
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from xgboost import XGBRegressor
import pickle

# 1. 讀取資料與特徵工程
df = pd.read_csv(r"C:\Users\檔名.csv")
df['e_time'] = pd.to_datetime(df['e_time'])
df['Hour'] = df['e_time'].dt.hour
df['Minute'] = df['e_time'].dt.minute
df['Weekday'] = df['e_time'].dt.weekday

# 計算距離
target_lat, target_lon = 25.026622, 121.53006
df['Distance'] = df.apply(
    lambda row: geodesic((row['PositionLat'], row['PositionLon']), (target_lat, target_lon)).meters
    if pd.notnull(row['PositionLat']) and pd.notnull(row['PositionLon']) else np.nan,
    axis=1
)

# 篩選資料與編碼車牌
df_filtered = df[df['Distance'] <= 20000].copy()
df_filtered['PlateNumb_encoded'] = df_filtered['PlateNumb'].astype('category').cat.codes

# 特徵欄位定義
feature_cols_A = ['PlateNumb_encoded', 'Speed', 'Distance', 'rt_delay_sec', 'Hour', 'Minute', 'Weekday', 
                  'peak', 'daytype', 'rain', 'temp', 'wind', 'EstimateTime']
feature_cols_B = ['PlateNumb_encoded', 'Speed', 'Distance', 'rt_delay_sec', 'Hour', 'Minute', 'Weekday', 
                  'peak', 'daytype', 'rain', 'temp', 'wind']

# 2. 訓練與評估模型
all_runs = {}

for name, cols in [('模型 A (含 EstimateTime)', feature_cols_A),
                   ('模型 B (不含 EstimateTime)', feature_cols_B)]:

    df_model = df_filtered.dropna(subset=cols + ['true_arrival_sec'])
    X = df_model[cols]
    y = df_model['true_arrival_sec']

    maes, rmses, r2s, models = [], [], [], []

    for i in range(20):
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, shuffle=True, random_state=i
        )

        model = XGBRegressor(n_estimators=100, max_depth=6, learning_rate=0.1, random_state=i)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        maes.append(mean_absolute_error(y_test, y_pred))
        rmses.append(np.sqrt(mean_squared_error(y_test, y_pred)))
        r2s.append(r2_score(y_test, y_pred))
        models.append(model)

    # 平均與最接近平均的一次
    avg_mae, avg_rmse, avg_r2 = np.mean(maes), np.mean(rmses), np.mean(r2s)
    distances = [(mae - avg_mae)**2 + (rmse - avg_rmse)**2 + (r2 - avg_r2)**2
                 for mae, rmse, r2 in zip(maes, rmses, r2s)]
    best_index = int(np.argmin(distances))
    best_model = models[best_index]

    all_runs[name] = {
        '平均 MAE': round(avg_mae, 2),
        'MAE 標準差': round(np.std(maes), 2),
        '平均 RMSE': round(avg_rmse, 2),
        'RMSE 標準差': round(np.std(rmses), 2),
        '平均 R²': round(avg_r2, 4),
        'R² 標準差': round(np.std(r2s), 4),
        '最佳模型 index': best_index,
        '最佳 MAE': round(maes[best_index], 2),
        '最佳 RMSE': round(rmses[best_index], 2),
        '最佳 R²': round(r2s[best_index], 4),
        '最佳模型物件': best_model
    }

# 取得當前的 PlateNumb 對應表
plate_categories = df_filtered['PlateNumb'].astype('category').cat.categories
plate_mapping = {v: i for i, v in enumerate(plate_categories)}

# 3. 印出結果
for name, res in all_runs.items():
    print(f"\n📊 {name}")
    for k, v in res.items():
        if k != '最佳模型物件':
            print(f"{k}: {v}")

#4. 儲存為 .pkl
# 包成一個 dict 一起存進 pkl
# 找出最常出現的車牌
most_common_plate = df_filtered['PlateNumb'].value_counts().idxmax()
default_encoded = plate_mapping[most_common_plate]

# 包含 fallback 編碼資訊的模型儲存包
modelA_pack = {
    'model': all_runs['模型 A (含 EstimateTime)']['最佳模型物件'],
    'plate_mapping': plate_mapping,
    'default_plate': most_common_plate,
    'default_encoded': default_encoded
}

modelB_pack = {
    'model': all_runs['模型 B (不含 EstimateTime)']['最佳模型物件'],
    'plate_mapping': plate_mapping,
    'default_plate': most_common_plate,
    'default_encoded': default_encoded
}

# 儲存為 .pkl 檔
with open(r"C:\Users\欲存成檔名", "wb") as f:
    pickle.dump(modelA_pack, f)

with open(r"C:\Users\欲存成檔名", "wb") as f:
    pickle.dump(modelB_pack, f)