In [17]:
# ===========================
# Cell 1: 導入套件和設定（改進版，抑制警告）
# ===========================
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import gc
import os
import time
import joblib
import json
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, f1_score, balanced_accuracy_score
import lightgbm as lgb

# 抑制警告
warnings.filterwarnings('ignore')
os.environ['PYTHONWARNINGS'] = 'ignore'

# 檢查 GPU 支援（簡化版）
def check_gpu_support():
    """檢查 LightGBM GPU 支援"""
    try:
        # 抑制 GPU 編譯警告
        import subprocess
        import sys
        
        # 創建測試數據
        test_data = lgb.Dataset(
            np.random.rand(100, 10), 
            label=np.random.randint(0, 2, 100)
        )
        test_params = {
            'device_type': 'gpu',
            'objective': 'binary',
            'verbose': -1,
            'force_row_wise': True
        }
        
        # 靜默測試
        with open(os.devnull, 'w') as devnull:
            old_stdout = sys.stdout
            sys.stdout = devnull
            try:
                lgb.train(test_params, test_data, num_boost_round=1)
                result = True
            except:
                result = False
            finally:
                sys.stdout = old_stdout
        
        return result
    except:
        return False

# 檢查環境
print("環境檢查:")
print(f"LightGBM: {lgb.__version__}")

# 檢查 GPU
gpu_available = check_gpu_support()
if gpu_available:
    print("✅ GPU 加速可用！")
    device_type = 'gpu'
else:
    print("❌ GPU 加速不可用，將使用 CPU")
    device_type = 'cpu'

環境檢查:
LightGBM: 4.3.0
✅ GPU 加速可用！


In [19]:
# ===========================
# Cell 2: 載入和準備資料
# ===========================
print("\n載入資料...")
file_path = 'us-accidents/US_Accidents_March23.csv'

# 載入資料
df = pd.read_csv(file_path)
print(f"原始資料大小: {df.shape}")

# 顯示目標變數分布
print("\n目標變數分布:")
severity_counts = df['Severity'].value_counts().sort_index()
for sev, count in severity_counts.items():
    print(f"Severity {sev}: {count:,} ({count/len(df)*100:.2f}%)")


載入資料...
原始資料大小: (7728394, 46)

目標變數分布:
Severity 1: 67,366 (0.87%)
Severity 2: 6,156,981 (79.67%)
Severity 3: 1,299,337 (16.81%)
Severity 4: 204,710 (2.65%)


In [20]:
# ===========================
# Cell 3: 進階特徵工程和預處理（改進版）
# ===========================
print("\n準備特徵（進階預處理）...")

# 處理時間特徵
df['Start_Time'] = pd.to_datetime(df['Start_Time'])
df['Hour'] = df['Start_Time'].dt.hour
df['DayOfWeek'] = df['Start_Time'].dt.dayofweek
df['Month'] = df['Start_Time'].dt.month
df['DayOfMonth'] = df['Start_Time'].dt.day
df['Year'] = df['Start_Time'].dt.year

# 新增時間特徵
df['IsWeekend'] = (df['DayOfWeek'] >= 5).astype(int)
df['IsRushHour'] = df['Hour'].apply(lambda x: 1 if (6 <= x <= 9) or (16 <= x <= 19) else 0)
df['Season'] = df['Month'].apply(lambda x: (x%12 + 3)//3)  # 1=冬, 2=春, 3=夏, 4=秋

# 選擇特徵
numeric_features = [
    'Start_Lat', 'Start_Lng', 
    'Distance(mi)', 'Temperature(F)', 'Wind_Chill(F)', 
    'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 
    'Wind_Speed(mph)', 'Precipitation(in)',
    'Hour', 'DayOfWeek', 'Month', 'DayOfMonth', 'Year',
    'IsWeekend', 'IsRushHour', 'Season'
]

boolean_features = [
    'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 
    'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 
    'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop'
]

categorical_features = [
    'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 
    'Astronomical_Twilight'
]

# 合併所有特徵
all_features = numeric_features + boolean_features + categorical_features
existing_features = [col for col in all_features if col in df.columns]

# 準備特徵矩陣
X = df[existing_features].copy()

# ===========================
# 資料清理和異常值處理
# ===========================
print("\n進行資料清理...")

# 1. 處理異常值（使用 IQR 方法）
def handle_outliers(df, columns, method='clip'):
    """處理異常值"""
    for col in columns:
        if col in df.columns:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 3 * IQR
            upper_bound = Q3 + 3 * IQR
            
            if method == 'clip':
                df[col] = df[col].clip(lower_bound, upper_bound)
            elif method == 'remove':
                df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
    
    return df

# 處理數值特徵的異常值（只處理天氣相關特徵）
weather_features = ['Temperature(F)', 'Humidity(%)', 'Pressure(in)', 
                   'Visibility(mi)', 'Wind_Speed(mph)', 'Precipitation(in)']
X = handle_outliers(X, weather_features, method='clip')

# 2. 處理缺失值（更智能的填充）
print("處理缺失值...")

# 對於數值特徵，使用中位數填充
for col in numeric_features:
    if col in X.columns:
        median_val = X[col].median()
        X[col] = X[col].fillna(median_val)

# 對於布林特徵，轉換並填充為 0
for col in boolean_features:
    if col in X.columns:
        X[col] = X[col].map({True: 1, False: 0}).fillna(0)

# 對於類別特徵，使用眾數填充
for col in categorical_features:
    if col in X.columns:
        mode_val = X[col].mode()[0] if len(X[col].mode()) > 0 else 'Unknown'
        X[col] = X[col].fillna(mode_val)
        X[col] = pd.Categorical(X[col]).codes

# 3. 特徵縮放（可選，LightGBM 不太需要，但有助於穩定性）
from sklearn.preprocessing import RobustScaler

scaler = RobustScaler()
numeric_cols = [col for col in numeric_features if col in X.columns]
X[numeric_cols] = scaler.fit_transform(X[numeric_cols])

# 保存 scaler 以供預測時使用
joblib.dump(scaler, './model_output/feature_scaler.pkl')

# ===========================
# 處理類別不平衡
# ===========================
print("\n處理類別不平衡...")

# 準備目標變數
y = df['Severity'].values - 1  # 轉換為 0-3

# 顯示原始分布
unique, counts = np.unique(y, return_counts=True)
print("原始類別分布:")
for cls, cnt in zip(unique, counts):
    print(f"  Severity {cls+1}: {cnt:,} ({cnt/len(y)*100:.2f}%)")

# 計算類別權重
from sklearn.utils.class_weight import compute_class_weight

class_weights = compute_class_weight(
    'balanced',
    classes=np.unique(y),
    y=y
)

# 轉換為 LightGBM 格式
class_weight_dict = {i: w for i, w in enumerate(class_weights)}
print("\n計算的類別權重:")
for cls, weight in class_weight_dict.items():
    print(f"  Severity {cls+1}: {weight:.4f}")

# 可選：使用 SMOTE 進行過採樣（僅用於訓練集）
use_smote = False  # 設為 True 以啟用 SMOTE

if use_smote:
    from imblearn.over_sampling import SMOTE
    print("\n應用 SMOTE 過採樣...")
    smote = SMOTE(random_state=42, k_neighbors=5)
    X_resampled, y_resampled = smote.fit_resample(X, y)
    print(f"過採樣後大小: {X_resampled.shape}")
else:
    X_resampled, y_resampled = X, y

print(f"\n最終特徵矩陣大小: {X_resampled.shape}")

# 儲存特徵名稱
feature_names = X.columns.tolist()


# ===== 把最終資料交給後續 Cell 4 / 5 使用 =====
if isinstance(X_resampled, np.ndarray):            # 若啟用 SMOTE 會變成 ndarray
    X_final = pd.DataFrame(X_resampled, columns=feature_names)
else:
    X_final = X_resampled.copy()                   # 仍為 DataFrame

y_final = y_resampled.astype(int)                  # 0‒3 的 ndarray




準備特徵（進階預處理）...

進行資料清理...
處理缺失值...

處理類別不平衡...
原始類別分布:
  Severity 1: 67,366 (0.87%)
  Severity 2: 6,156,981 (79.67%)
  Severity 3: 1,299,337 (16.81%)
  Severity 4: 204,710 (2.65%)

計算的類別權重:
  Severity 1: 28.6806
  Severity 2: 0.3138
  Severity 3: 1.4870
  Severity 4: 9.4382

最終特徵矩陣大小: (7728394, 35)


In [None]:
# ===========================
# Cell 4: 使用 K-Fold 交叉驗證評估 LightGBM（修正版，強化 early stopping）
# ===========================
print("\n" + "="*60)
print(f"訓練 LightGBM 模型（使用全部數據，{device_type.upper()} 模式）")
print("="*60)

# LightGBM 參數
lgb_params = {
    'objective': 'multiclass',
    'num_class': 4,
    'metric': 'multi_logloss',
    'boosting_type': 'gbdt',
    'num_leaves': 63,
    'max_depth': 7,
    'learning_rate': 0.05,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'reg_alpha': 0.01,
    'reg_lambda': 0.01,
    'min_split_gain': 0.01,
    'min_child_weight': 1,
    'min_child_samples': 20,
    'verbose': -1,
    'random_state': 42,
    'extra_trees': True,
    'path_smooth': 0.2,
    'force_row_wise': True,  # 避免警告
    'device_type': device_type,
}

# 如果使用 GPU，添加 GPU 特定參數
if device_type == 'gpu':
    lgb_params.update({
        'max_bin': 63,
        'gpu_use_dp': False,
        'num_threads': 0,
    })
else:
    lgb_params['n_jobs'] = -1

# K-Fold 交叉驗證
print("\n進行 5-Fold 交叉驗證...")
print(f"資料集大小: {len(X):,} 筆")
print("使用 Early Stopping（patience=200）")
print("最大迭代次數: 10,000（如果需要可跑整晚）\n")

kfold = KFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = {
    'accuracy': [],
    'f1': [],
    'balanced_accuracy': [],
    'best_iterations': [],
    'training_times': []
}

total_start_time = time.time()

# 改進的進度回調類別
class ProgressCallback:
    def __init__(self, fold_num, print_freq=100):
        self.fold_num = fold_num
        self.print_freq = print_freq
        self.start_time = time.time()
        self.last_print_time = time.time()
        self.best_score = float('inf')
        self.best_iteration = 0
        self.no_improve_rounds = 0
        
    def __call__(self, env):
        # 確保 env 有 iteration 屬性
        if not hasattr(env, 'iteration'):
            return
            
        iteration = env.iteration + 1
        
        # 只在特定頻率印出
        if iteration % self.print_freq == 0 or iteration == 1:
            current_time = time.time()
            elapsed = current_time - self.start_time
            speed = iteration / elapsed if elapsed > 0 else 0
            
            # 獲取驗證分數
            if hasattr(env, 'evaluation_result_list') and env.evaluation_result_list:
                # 處理不同版本的 LightGBM
                result = env.evaluation_result_list[-1]
                if isinstance(result, tuple):
                    current_score = result[2] if len(result) >= 3 else result[1]
                else:
                    current_score = self.best_score
                
                # 更新最佳分數
                if current_score < self.best_score:
                    self.best_score = current_score
                    self.best_iteration = iteration
                    self.no_improve_rounds = 0
                else:
                    self.no_improve_rounds = iteration - self.best_iteration
                
                print(f"  [Fold {self.fold_num}] Iter {iteration:5d} | "
                      f"Valid Loss: {current_score:.6f} | "
                      f"Best: {self.best_score:.6f} @ {self.best_iteration} | "
                      f"No improve: {self.no_improve_rounds} | "
                      f"Speed: {speed:.1f} it/s")
            else:
                print(f"  [Fold {self.fold_num}] Iter {iteration:5d} | "
                      f"Speed: {speed:.1f} it/s")

# 訓練每個 fold
for fold, (train_idx, val_idx) in enumerate(kfold.split(X_final, y_final)):
    fold_start_time = time.time()
    print(f"\n{'='*50}")
    print(f"Fold {fold + 1}/5")
    print(f"{'='*50}")
    print(f"訓練集: {len(train_idx):,} 筆")
    print(f"驗證集: {len(val_idx):,} 筆")
    
    # 分割數據
    if isinstance(X_final, pd.DataFrame):
        X_train_fold = X_final.iloc[train_idx]
        X_val_fold = X_final.iloc[val_idx]
    else:
        X_train_fold = X_final[train_idx]
        X_val_fold = X_final[val_idx]
    
    y_train_fold = y_final[train_idx]
    y_val_fold = y_final[val_idx]
    
    # 創建數據集
    train_data = lgb.Dataset(X_train_fold, label=y_train_fold)
    valid_data = lgb.Dataset(X_val_fold, label=y_val_fold, reference=train_data)
    
    # 訓練模型
    print(f"\n開始訓練（{device_type.upper()}）...")
    print("設定: num_boost_round=10000, early_stopping_rounds=200")
    
    try:
        # 使用自定義 callback 和內建 early stopping
        callbacks = [
            lgb.early_stopping(stopping_rounds=200, verbose=False),
            lgb.log_evaluation(0),  # 0 表示不印出預設 log
            ProgressCallback(fold + 1, print_freq=100)
        ]
        
        model_fold = lgb.train(
            lgb_params,
            train_data,
            valid_sets=[valid_data],
            valid_names=['valid'],
            num_boost_round=10000,  # 設定很大，讓 early stopping 決定何時停止
            callbacks=callbacks
        )
        
    except Exception as e:
        print(f"\n訓練錯誤: {str(e)}")
        print("嘗試使用簡化的訓練方式...")
        
        # 備用訓練方式
        model_fold = lgb.train(
            lgb_params,
            train_data,
            valid_sets=[valid_data],
            num_boost_round=2000,
            callbacks=[
                lgb.early_stopping(stopping_rounds=200),
                lgb.log_evaluation(100)
            ]
        )
    
    # 預測和評估
    y_pred = model_fold.predict(X_val_fold, num_iteration=model_fold.best_iteration)
    y_pred_class = np.argmax(y_pred, axis=1)
    
    acc = accuracy_score(y_val_fold, y_pred_class)
    f1 = f1_score(y_val_fold, y_pred_class, average='weighted')
    balanced_acc = balanced_accuracy_score(y_val_fold, y_pred_class)
    
    cv_scores['accuracy'].append(acc)
    cv_scores['f1'].append(f1)
    cv_scores['balanced_accuracy'].append(balanced_acc)
    cv_scores['best_iterations'].append(model_fold.best_iteration)
    
    fold_time = time.time() - fold_start_time
    cv_scores['training_times'].append(fold_time)
    
    print(f"\nFold {fold + 1} 完成！")
    print(f"  最佳迭代次數: {model_fold.best_iteration}")
    print(f"  準確率: {acc:.4f}")
    print(f"  F1分數: {f1:.4f}")
    print(f"  平衡準確率: {balanced_acc:.4f}")
    print(f"  訓練時間: {fold_time/60:.2f} 分鐘")
    
    # 估計剩餘時間
    if fold < 4:
        avg_fold_time = np.mean(cv_scores['training_times'])
        remaining_folds = 4 - fold
        eta_minutes = (avg_fold_time * remaining_folds) / 60
        print(f"\n預估剩餘時間: {eta_minutes:.1f} 分鐘")
    
    # 清理記憶體
    del model_fold, train_data, valid_data
    gc.collect()

# 顯示交叉驗證結果
total_cv_time = time.time() - total_start_time
print("\n" + "="*60)
print("交叉驗證結果:")
print("="*60)
print(f"使用設備: {device_type.upper()}")
print(f"平均準確率: {np.mean(cv_scores['accuracy']):.4f} (±{np.std(cv_scores['accuracy']):.4f})")
print(f"平均F1分數: {np.mean(cv_scores['f1']):.4f} (±{np.std(cv_scores['f1']):.4f})")
print(f"平均平衡準確率: {np.mean(cv_scores['balanced_accuracy']):.4f} (±{np.std(cv_scores['balanced_accuracy']):.4f})")
print(f"平均最佳迭代次數: {np.mean(cv_scores['best_iterations']):.0f} (±{np.std(cv_scores['best_iterations']):.0f})")
print(f"平均每個 Fold 時間: {np.mean(cv_scores['training_times'])/60:.2f} 分鐘")
print(f"\n總交叉驗證時間: {total_cv_time/60:.2f} 分鐘")


訓練 LightGBM 模型（使用全部數據，GPU 模式）

進行 5-Fold 交叉驗證...
資料集大小: 7,728,394 筆
使用 Early Stopping（patience=200）
最大迭代次數: 10,000（如果需要可跑整晚）


Fold 1/5
訓練集: 6,182,715 筆
驗證集: 1,545,679 筆

開始訓練（GPU）...
設定: num_boost_round=10000, early_stopping_rounds=200
  [Fold 1] Iter     1 | Valid Loss: 0.601082 | Best: 0.601082 @ 1 | No improve: 0 | Speed: 0.5 it/s
  [Fold 1] Iter   100 | Valid Loss: 0.410035 | Best: 0.410035 @ 100 | No improve: 0 | Speed: 2.0 it/s
  [Fold 1] Iter   200 | Valid Loss: 0.393007 | Best: 0.393007 @ 200 | No improve: 0 | Speed: 1.8 it/s
  [Fold 1] Iter   300 | Valid Loss: 0.384137 | Best: 0.384137 @ 300 | No improve: 0 | Speed: 1.8 it/s
  [Fold 1] Iter   400 | Valid Loss: 0.378466 | Best: 0.378466 @ 400 | No improve: 0 | Speed: 1.8 it/s
  [Fold 1] Iter   500 | Valid Loss: 0.374430 | Best: 0.374430 @ 500 | No improve: 0 | Speed: 1.8 it/s
  [Fold 1] Iter   600 | Valid Loss: 0.371515 | Best: 0.371515 @ 600 | No improve: 0 | Speed: 1.9 it/s
  [Fold 1] Iter   700 | Valid Loss: 0.368919 | Bes

In [None]:
# ===========================
# Cell 5: 使用全部數據訓練最終模型（強化 early stopping）
# ===========================
print("\n" + "="*60)
print("使用全部數據訓練最終模型")
print("="*60)
print(f"訓練集大小: {len(X_final):,} 筆")
print(f"使用設備: {device_type.upper()}")

# 基於 CV 結果調整訓練參數
suggested_rounds = int(np.mean(cv_scores['best_iterations']) * 1.5)
max_rounds = max(20000, suggested_rounds * 2)  # 至少 20000 輪

print(f"建議迭代次數: {suggested_rounds}")
print(f"最大迭代次數: {max_rounds} (可跑整晚)")
print(f"Early stopping: 500 輪無改善即停止")
print("\n如果想要更長時間的訓練，模型會自動使用 early stopping 在最佳點停止")

# 準備全部數據
if isinstance(X_final, pd.DataFrame):
    train_data_full = lgb.Dataset(X_final, label=y_final)
else:
    train_data_full = lgb.Dataset(X_final, label=y_final)

# 最終訓練進度回調
class FinalTrainingCallback:
    def __init__(self, print_freq=200):
        self.print_freq = print_freq
        self.start_time = time.time()
        self.best_score = float('inf')
        self.best_iteration = 0
        
    def __call__(self, env):
        if not hasattr(env, 'iteration'):
            return
            
        iteration = env.iteration + 1
        
        if iteration % self.print_freq == 0 or iteration == 1:
            elapsed = time.time() - self.start_time
            speed = iteration / elapsed if elapsed > 0 else 0
            eta = (env.end_iteration - iteration) / speed if speed > 0 else 0
            
            # 獲取訓練分數
            if hasattr(env, 'evaluation_result_list') and env.evaluation_result_list:
                result = env.evaluation_result_list[0]
                if isinstance(result, tuple):
                    current_score = result[2] if len(result) >= 3 else result[1]
                    
                    if current_score < self.best_score:
                        self.best_score = current_score
                        self.best_iteration = iteration
                    
                    no_improve = iteration - self.best_iteration
                    
                    print(f"  Iteration {iteration:5d}/{env.end_iteration} | "
                          f"Train Loss: {current_score:.6f} | "
                          f"Best: {self.best_score:.6f} @ {self.best_iteration} | "
                          f"No improve: {no_improve} | "
                          f"Speed: {speed:.1f} it/s | "
                          f"ETA: {eta/60:.1f} min")

# 訓練最終模型
print("\n開始訓練...")
start_time = time.time()

# 分割一小部分作為驗證集以監控訓練
from sklearn.model_selection import train_test_split
X_train_final, X_val_final, y_train_final, y_val_final = train_test_split(
    X_final, y_final, test_size=0.1, random_state=42, stratify=y_final
)

train_data_final = lgb.Dataset(X_train_final, label=y_train_final)
valid_data_final = lgb.Dataset(X_val_final, label=y_val_final, reference=train_data_final)

try:
    final_model = lgb.train(
        lgb_params,
        train_data_final,
        valid_sets=[train_data_final, valid_data_final],
        valid_names=['train', 'valid'],
        num_boost_round=max_rounds,
        callbacks=[
            lgb.early_stopping(stopping_rounds=500, verbose=True),
            lgb.log_evaluation(0),
            FinalTrainingCallback(print_freq=200)
        ]
    )
except Exception as e:
    print(f"訓練錯誤: {str(e)}")
    print("使用備用方法...")
    
    final_model = lgb.train(
        lgb_params,
        train_data_final,
        valid_sets=[valid_data_final],
        num_boost_round=max_rounds,
        callbacks=[
            lgb.early_stopping(stopping_rounds=500),
            lgb.log_evaluation(200)
        ]
    )

train_time = time.time() - start_time
print(f"\n訓練完成！")
print(f"訓練時間: {train_time/60:.2f} 分鐘")
print(f"最終迭代次數: {final_model.best_iteration}")
print(f"訓練速度: {final_model.best_iteration/train_time:.1f} 迭代/秒")

# 保存模型
model_path = './model_output/final_lgb_model.pkl'
joblib.dump(final_model, model_path)
print(f"模型已保存至: {model_path}")

# 在驗證集上的最終評估
y_pred_final = final_model.predict(X_val_final, num_iteration=final_model.best_iteration)
y_pred_class_final = np.argmax(y_pred_final, axis=1)

print(f"\n驗證集最終性能:")
print(f"準確率: {accuracy_score(y_val_final, y_pred_class_final):.4f}")
print(f"F1分數: {f1_score(y_val_final, y_pred_class_final, average='weighted'):.4f}")
print(f"平衡準確率: {balanced_accuracy_score(y_val_final, y_pred_class_final):.4f}")

# 顯示總體時間統計
total_time = time.time() - total_start_time
print(f"\n總執行時間（含交叉驗證）: {total_time/60:.2f} 分鐘")

In [None]:
# ===========================
# Cell 6: 特徵重要性分析
# ===========================
print("\n" + "="*60)
print("特徵重要性分析")
print("="*60)

feature_importance = final_model.feature_importance(importance_type='gain')
feature_importance_df = pd.DataFrame({
    'feature': feature_names,
    'importance': feature_importance
}).sort_values('importance', ascending=False)

print("\nTop 15 重要特徵:")
for idx, row in feature_importance_df.head(15).iterrows():
    print(f"{row['feature']:<25} {row['importance']:>10.2f}")

# 視覺化特徵重要性
plt.figure(figsize=(10, 8))
top_features = feature_importance_df.head(20)
plt.barh(range(len(top_features)), top_features['importance'], color='lightblue')
plt.yticks(range(len(top_features)), top_features['feature'])
plt.xlabel('重要性分數')
plt.title('LightGBM 特徵重要性 (Top 20)')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig('./model_output/feature_importance.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# ===========================
# Cell 7: 創建未來時空預測系統
# ===========================
class AccidentPredictor:
    """事故預測系統，可以預測未來時空的事故風險"""
    
    def __init__(self, model, feature_names):
        self.model = model
        self.feature_names = feature_names
        
        # 載入歷史數據統計
        self.load_historical_stats()
    
    def load_historical_stats(self):
        """載入歷史數據統計資訊"""
        print("\n載入歷史統計資訊...")
        
        # 讀取數據來計算統計
        df_stats = pd.read_csv(file_path, nrows=100000)
        
        # 計算各種統計值
        self.lat_range = (df_stats['Start_Lat'].quantile(0.05), 
                         df_stats['Start_Lat'].quantile(0.95))
        self.lng_range = (df_stats['Start_Lng'].quantile(0.05), 
                         df_stats['Start_Lng'].quantile(0.95))
        
        # 天氣條件的統計
        self.weather_stats = {
            'Temperature(F)': {
                'mean': df_stats['Temperature(F)'].mean(),
                'std': df_stats['Temperature(F)'].std()
            },
            'Humidity(%)': {
                'mean': df_stats['Humidity(%)'].mean(),
                'std': df_stats['Humidity(%)'].std()
            },
            'Pressure(in)': {
                'mean': df_stats['Pressure(in)'].mean(),
                'std': df_stats['Pressure(in)'].std()
            },
            'Visibility(mi)': {
                'mean': df_stats['Visibility(mi)'].mean(),
                'std': df_stats['Visibility(mi)'].std()
            },
            'Wind_Speed(mph)': {
                'mean': df_stats['Wind_Speed(mph)'].mean(),
                'std': df_stats['Wind_Speed(mph)'].std()
            },
            'Precipitation(in)': {
                'mean': df_stats['Precipitation(in)'].mean(),
                'std': df_stats['Precipitation(in)'].std()
            }
        }
        
        # 道路特徵的概率
        self.road_feature_probs = {}
        for feat in boolean_features:
            if feat in df_stats.columns:
                self.road_feature_probs[feat] = df_stats[feat].mean()
        
        print("統計資訊載入完成")
    
    def generate_prediction_grid(self, 
                               start_date, 
                               end_date, 
                               lat_points=20, 
                               lng_points=20,
                               hours_step=3,
                               weather_scenario='normal'):
        """生成未來時空網格的預測"""
        print(f"\n生成預測網格...")
        print(f"日期範圍: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
        print(f"空間網格: {lat_points} x {lng_points}")
        print(f"時間步長: {hours_step} 小時")
        print(f"天氣場景: {weather_scenario}")
        
        predictions = []
        
        # 生成時間序列
        current_time = start_date
        time_points = []
        while current_time <= end_date:
            time_points.append(current_time)
            current_time += timedelta(hours=hours_step)
        
        # 生成空間網格
        lats = np.linspace(self.lat_range[0], self.lat_range[1], lat_points)
        lngs = np.linspace(self.lng_range[0], self.lng_range[1], lng_points)
        
        total_predictions = len(time_points) * lat_points * lng_points
        print(f"總預測點數: {total_predictions:,}")
        
        # 批次處理
        batch_size = 1000
        batch_features = []
        batch_info = []
        processed = 0
        
        for t, time_point in enumerate(time_points):
            # 時間特徵
            hour = time_point.hour
            day_of_week = time_point.weekday()
            month = time_point.month
            day_of_month = time_point.day
            year = time_point.year
            
            # 根據時間和場景調整天氣
            weather = self._get_weather_conditions(hour, month, weather_scenario)
            
            for lat in lats:
                for lng in lngs:
                    # 創建特徵向量
                    features = self._create_feature_vector(
                        lat, lng, hour, day_of_week, month, day_of_month, year, weather
                    )
                    
                    batch_features.append(features)
                    batch_info.append({
                        'latitude': lat,
                        'longitude': lng,
                        'timestamp': time_point,
                        'weather': weather
                    })
                    
                    # 批次預測
                    if len(batch_features) >= batch_size:
                        self._process_batch(batch_features, batch_info, predictions)
                        processed += len(batch_features)
                        batch_features = []
                        batch_info = []
                        
                        # 顯示進度
                        if processed % 10000 == 0:
                            progress = processed / total_predictions * 100
                            print(f"  進度: {progress:.1f}%")
        
        # 處理剩餘的批次
        if batch_features:
            self._process_batch(batch_features, batch_info, predictions)
        
        print("預測完成！")
        return pd.DataFrame(predictions)
    
    def _get_weather_conditions(self, hour, month, scenario):
        """根據時間和場景生成天氣條件"""
        weather = {}
        
        for condition, stats in self.weather_stats.items():
            base_value = stats['mean']
            std = stats['std']
            
            # 季節調整
            if condition == 'Temperature(F)':
                if month in [12, 1, 2]:  # 冬季
                    base_value -= 20
                elif month in [6, 7, 8]:  # 夏季
                    base_value += 20
                
                # 時間調整
                if hour >= 22 or hour <= 6:  # 夜間
                    base_value -= 10
            
            # 場景調整
            if scenario == 'bad':
                if condition == 'Visibility(mi)':
                    base_value *= 0.5
                elif condition == 'Precipitation(in)':
                    base_value = max(0.5, base_value * 3)
                elif condition == 'Wind_Speed(mph)':
                    base_value *= 1.5
            elif scenario == 'good':
                if condition == 'Visibility(mi)':
                    base_value = min(10, base_value * 1.5)
                elif condition == 'Precipitation(in)':
                    base_value = 0
                elif condition == 'Wind_Speed(mph)':
                    base_value *= 0.7
            
            # 添加一些隨機變化
            weather[condition] = max(0, base_value + np.random.normal(0, std * 0.1))
        
        return weather
    
    def _create_feature_vector(self, lat, lng, hour, dow, month, dom, year, weather):
        """創建特徵向量"""
        features = np.zeros(len(self.feature_names))
        
        for i, feat_name in enumerate(self.feature_names):
            if feat_name == 'Start_Lat':
                features[i] = lat
            elif feat_name == 'Start_Lng':
                features[i] = lng
            elif feat_name == 'Hour':
                features[i] = hour
            elif feat_name == 'DayOfWeek':
                features[i] = dow
            elif feat_name == 'Month':
                features[i] = month
            elif feat_name == 'DayOfMonth':
                features[i] = dom
            elif feat_name == 'Year':
                features[i] = year
            elif feat_name in weather:
                features[i] = weather[feat_name]
            elif feat_name in self.road_feature_probs:
                features[i] = 1 if np.random.random() < self.road_feature_probs[feat_name] else 0
            elif feat_name == 'Distance(mi)':
                features[i] = np.random.exponential(2)
            elif feat_name == 'Wind_Chill(F)':
                # 根據溫度估算風寒
                temp = weather.get('Temperature(F)', 70)
                features[i] = temp - 5 if temp < 50 else temp
            else:
                features[i] = -999
        
        return features
    
    def _process_batch(self, batch_features, batch_info, predictions):
        """批次處理預測"""
        X_batch = np.array(batch_features)
        
        # 預測
        pred_proba = self.model.predict(X_batch, num_iteration=self.model.best_iteration)
        pred_class = np.argmax(pred_proba, axis=1)
        
        # 計算風險分數
        risk_weights = np.array([0.1, 0.3, 0.6, 1.0])
        risk_scores = np.sum(pred_proba * risk_weights, axis=1)
        
        # 整理結果
        for i, (info, risk, severity, proba) in enumerate(zip(batch_info, risk_scores, pred_class, pred_proba)):
            predictions.append({
                'latitude': round(info['latitude'], 6),
                'longitude': round(info['longitude'], 6),
                'timestamp': info['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),
                'predicted_severity': int(severity + 1),
                'risk_score': round(float(risk), 4),
                'risk_category': self._get_risk_category(risk),
                'hour': info['timestamp'].hour,
                'day_of_week': info['timestamp'].weekday(),
                'day_name': info['timestamp'].strftime('%A'),
                'temperature': round(info['weather']['Temperature(F)'], 1),
                'visibility': round(info['weather']['Visibility(mi)'], 1),
                'precipitation': round(info['weather']['Precipitation(in)'], 2),
                'humidity': round(info['weather']['Humidity(%)'], 1),
                'wind_speed': round(info['weather']['Wind_Speed(mph)'], 1),
                'severity_1_prob': round(float(proba[0]), 4),
                'severity_2_prob': round(float(proba[1]), 4),
                'severity_3_prob': round(float(proba[2]), 4),
                'severity_4_prob': round(float(proba[3]), 4)
            })
    
    def _get_risk_category(self, risk_score):
        """根據風險分數分類"""
        if risk_score < 0.25:
            return 'Low'
        elif risk_score < 0.5:
            return 'Medium'
        elif risk_score < 0.75:
            return 'High'
        else:
            return 'Very High'


In [None]:
# ===========================
# Cell 8: 生成未來預測
# ===========================
# 創建預測器
predictor = AccidentPredictor(final_model, feature_names)

# 設定預測參數
start_date = datetime.now()
end_date = start_date + timedelta(days=7)  # 預測未來7天

# 生成三種天氣場景的預測
scenarios = ['normal', 'bad', 'good']
all_predictions = []

for scenario in scenarios:
    print(f"\n生成 {scenario} 天氣場景預測...")
    predictions = predictor.generate_prediction_grid(
        start_date=start_date,
        end_date=end_date,
        lat_points=20,
        lng_points=20,
        hours_step=4,  # 每4小時一個預測
        weather_scenario=scenario
    )
    predictions['weather_scenario'] = scenario
    all_predictions.append(predictions)

# 合併所有預測
future_predictions = pd.concat(all_predictions, ignore_index=True)
print(f"\n總預測數據量: {len(future_predictions):,}")

# 保存預測數據
output_path = './model_output/future_predictions_kepler.csv'
future_predictions.to_csv(output_path, index=False)
print(f"預測數據已保存至: {output_path}")

# 創建 GeoJSON
features = []
for idx, row in future_predictions.iterrows():
    feature = {
        "type": "Feature",
        "geometry": {
            "type": "Point",
            "coordinates": [row['longitude'], row['latitude']]
        },
        "properties": {key: value for key, value in row.items() 
                      if key not in ['latitude', 'longitude']}
    }
    features.append(feature)

geojson = {
    "type": "FeatureCollection",
    "features": features
}

geojson_path = './model_output/future_predictions_kepler.json'
with open(geojson_path, 'w') as f:
    json.dump(geojson, f)

print(f"GeoJSON 已保存至: {geojson_path}")

In [None]:
# ===========================
# Cell 9: 創建預測摘要和視覺化
# ===========================
# 預測摘要統計
print("\n" + "="*60)
print("預測摘要統計")
print("="*60)

# 按天氣場景統計
scenario_stats = future_predictions.groupby('weather_scenario').agg({
    'risk_score': ['mean', 'std', 'min', 'max'],
    'predicted_severity': 'mean'
})
print("\n各天氣場景統計:")
print(scenario_stats)

# 按時間統計
time_stats = future_predictions.groupby('hour')['risk_score'].mean()
print("\n高風險時段 (Top 5):")
print(time_stats.nlargest(5))

# 風險分布
risk_dist = future_predictions['risk_category'].value_counts()
print("\n風險類別分布:")
print(risk_dist)

# 視覺化
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 1. 風險分數時間分布
ax1 = axes[0, 0]
for scenario in scenarios:
    data = future_predictions[future_predictions['weather_scenario'] == scenario]
    hourly_risk = data.groupby('hour')['risk_score'].mean()
    ax1.plot(hourly_risk.index, hourly_risk.values, marker='o', label=scenario)
ax1.set_xlabel('小時')
ax1.set_ylabel('平均風險分數')
ax1.set_title('24小時風險分布')
ax1.legend()
ax1.grid(True, alpha=0.3)

# 2. 風險類別餅圖
ax2 = axes[0, 1]
colors = {'Low': 'green', 'Medium': 'yellow', 'High': 'orange', 'Very High': 'red'}
risk_counts = future_predictions['risk_category'].value_counts()
ax2.pie(risk_counts.values, labels=risk_counts.index, autopct='%1.1f%%',
        colors=[colors[cat] for cat in risk_counts.index])
ax2.set_title('風險類別分布')

# 3. 天氣場景比較
ax3 = axes[1, 0]
scenario_comparison = future_predictions.groupby(['weather_scenario', 'risk_category']).size().unstack()
scenario_comparison.plot(kind='bar', stacked=True, ax=ax3, 
                         color=['green', 'yellow', 'orange', 'red'])
ax3.set_xlabel('天氣場景')
ax3.set_ylabel('預測數量')
ax3.set_title('不同天氣場景的風險分布')
ax3.legend(title='風險類別')

# 4. 嚴重程度預測分布
ax4 = axes[1, 1]
severity_dist = future_predictions.groupby(['weather_scenario', 'predicted_severity']).size().unstack()
severity_dist.plot(kind='bar', ax=ax4)
ax4.set_xlabel('天氣場景')
ax4.set_ylabel('預測數量')
ax4.set_title('預測嚴重程度分布')
ax4.legend(title='嚴重程度', labels=['Level 1', 'Level 2', 'Level 3', 'Level 4'])

plt.tight_layout()
plt.savefig('./model_output/prediction_summary.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# ===========================
# Cell 10: 保存模型資訊
# ===========================
model_info = {
    "model_type": "LightGBM",
    "training_info": {
        "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        "data_size": len(X),
        "feature_count": len(feature_names),
        "training_time_minutes": train_time/60
    },
    "performance": {
        "cv_accuracy": f"{np.mean(cv_scores['accuracy']):.4f} (±{np.std(cv_scores['accuracy']):.4f})",
        "cv_f1_score": f"{np.mean(cv_scores['f1']):.4f} (±{np.std(cv_scores['f1']):.4f})",
        "cv_balanced_accuracy": f"{np.mean(cv_scores['balanced_accuracy']):.4f} (±{np.std(cv_scores['balanced_accuracy']):.4f})"
    },
    "features": feature_names,
    "hyperparameters": lgb_params,
    "prediction_info": {
        "total_predictions": len(future_predictions),
        "date_range": f"{start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}",
        "scenarios": scenarios,
        "spatial_resolution": f"{20} x {20} grid points"
    }
}

with open('./model_output/model_info.json', 'w') as f:
    json.dump(model_info, f, indent=4)

print("\n" + "="*60)
print("完成！")
print("="*60)
print("\n產出檔案:")
print("1. final_lgb_model.pkl - 訓練好的模型")
print("2. future_predictions_kepler.csv - Kepler.gl CSV 格式")
print("3. future_predictions_kepler.json - Kepler.gl GeoJSON 格式")
print("4. feature_importance.png - 特徵重要性圖")
print("5. prediction_summary.png - 預測摘要圖")
print("6. model_info.json - 模型資訊")
print("\n使用 Kepler.gl:")
print("1. 前往 https://kepler.gl/")
print("2. 上傳 CSV 或 JSON 檔案")
print("3. 使用時間滑塊查看風險演變")
print("4. 根據 weather_scenario 篩選不同天氣")
print("5. 使用 risk_score 創建熱力圖")