In [4]:
# 1.讀入套件
import numpy as np
import pandas as pd
from collections import Counter
from typing import Optional, Union, Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')
# 用於輸出 Excel 檔案
from openpyxl import Workbook
from openpyxl.styles import PatternFill

# 2.資料預處理 
class AdultDataPreprocessor:
    
    def __init__(self, train_path: str, test_path: str):
        self.train_path = train_path
        self.test_path = test_path
        
        # Adult 資料集的欄位名稱
        self.column_names = [
            'age', 'workclass', 'fnlwgt', 'education', 'education-num',
            'marital-status', 'occupation', 'relationship', 'race', 'sex',
            'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'income'
        ]
        
        # 類別型特徵
        self.categorical_features = [
            'workclass', 'education', 'marital-status', 'occupation',
            'relationship', 'race', 'sex', 'native-country'
        ]
        
        # 數值型特徵
        self.numerical_features = [
            'age', 'fnlwgt', 'education-num', 'capital-gain',
            'capital-loss', 'hours-per-week'
        ]
    
    def load_and_preprocess(self, filepath: str, is_test: bool = False) -> pd.DataFrame:
        # 讀取資料
        df = pd.read_csv(filepath, names=self.column_names, 
                         skipinitialspace=True, na_values='?')
        
        # 測試資料的第一行可能是註解，需要移除
        if is_test:
            df = df[~df['age'].astype(str).str.contains('\\|', na=False)].reset_index(drop=True)
            df['income'] = df['income'].str.rstrip('.')
        
        # 檢查並移除重複資料
        initial_rows = len(df)
        df = df.drop_duplicates()
        removed_duplicates = initial_rows - len(df)
        if removed_duplicates > 0:
            print(f"移除 {removed_duplicates} 筆重複資料")
        
        # 處理缺失值 - 使用眾數填補類別型特徵
        for col in self.categorical_features:
            if df[col].isnull().any():
                mode_value = df[col].mode()[0]
                df[col].fillna(mode_value, inplace=True)
                print(f"欄位 {col} 有缺失值，使用眾數 '{mode_value}' 填補")
        
        # 處理缺失值 - 使用中位數填補數值型特徵
        for col in self.numerical_features:
            if df[col].isnull().any():
                median_value = df[col].median()
                df[col].fillna(median_value, inplace=True)
                print(f"欄位 {col} 有缺失值，使用中位數 {median_value} 填補")
            
            # 強制將數值型特徵轉換為數值類型
            df[col] = pd.to_numeric(df[col], errors='coerce')
            if df[col].isnull().any():
                df[col].fillna(df[col].median(), inplace=True)
        
        # 標準化目標變數
        df['income'] = df['income'].map({'<=50K': 0, '>50K': 1})
        
        return df

    #preprocessing train data
    def preprocess_train(self) -> pd.DataFrame:
        print("=" * 50)
        print("處理訓練資料...")
        print("=" * 50)
        train_df = self.load_and_preprocess(self.train_path, is_test=False)
        print(f"訓練資料筆數: {len(train_df)}")
        print(f"特徵數量: {len(train_df.columns) - 1}")
        return train_df

    #preprocessing test data
    def preprocess_test(self) -> pd.DataFrame:
        print("\n" + "=" * 50)
        print("處理測試資料...")
        print("=" * 50)
        test_df = self.load_and_preprocess(self.test_path, is_test=True)
        print(f"測試資料筆數: {len(test_df)}")
        return test_df

# 3. C4.5 決策樹實作 
class C45Node:
    
    #初始化 C4.5 決策樹節點
    def __init__(self):
        self.feature = None
        self.threshold = None
        self.categories = None
        self.is_leaf = False
        self.prediction = None
        self.children = {}
        self.info_gain = 0.0
        self.samples = 0
        self.class_distribution = {}

class C45DecisionTree:
    #R決策樹初始化   
    def __init__(self, 
                 max_depth: Optional[int] = None,
                 min_samples_split: int = 2,
                 min_samples_leaf: int = 1,
                 max_nodes: Optional[int] = None):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.max_nodes = max_nodes
        self.root = None
        self.node_count = 0
        self.feature_names = None
        self.categorical_features = []
        self.numerical_features = []
    
    def _entropy(self, y: np.ndarray) -> float:
        #計算 entropy
        if len(y) == 0:
            return 0.0
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / len(y)
        return -np.sum(probabilities * np.log2(probabilities + 1e-10))
    
    def _gain_ratio(self, y: np.ndarray, splits: List[np.ndarray]) -> float:
        #計算增益比率 (Gain Ratio)
        parent_entropy = self._entropy(y)
        total = len(y)
        
        # 計算加權子集熵和分割資訊
        weighted_entropy = 0.0
        split_info = 0.0
        
        for split in splits:
            if len(split) > 0:
                weight = len(split) / total
                weighted_entropy += weight * self._entropy(split)
                split_info -= weight * np.log2(weight + 1e-10)
        
        info_gain = parent_entropy - weighted_entropy
        return info_gain / split_info if split_info > 0 else 0.0
        
    """找出數值型特徵的最佳分割點"""
    def _best_split_numerical(self, X: pd.DataFrame, y: np.ndarray, 
                              feature: str) -> Tuple[float, float]:
        
        values = X[feature].values
        unique_values = np.unique(values)
        
        if len(unique_values) <= 1:
            return 0.0, None
        
        best_gain_ratio = 0.0
        best_threshold = None
        
        # 優化：如果唯一值過多，取樣候選分割點
        if len(unique_values) > 50:
            unique_values = np.percentile(unique_values, np.linspace(5, 95, 20))
        
        # 嘗試所有可能的分割點
        for i in range(len(unique_values) - 1):
            threshold = (unique_values[i] + unique_values[i + 1]) / 2
            
            left_mask = values <= threshold
            left_y, right_y = y[left_mask], y[~left_mask]
            
            # 檢查是否滿足最小葉節點樣本數
            if len(left_y) < self.min_samples_leaf or len(right_y) < self.min_samples_leaf:
                continue
            
            # 計算增益比率
            gain_ratio = self._gain_ratio(y, [left_y, right_y])
            
            if gain_ratio > best_gain_ratio:
                best_gain_ratio = gain_ratio
                best_threshold = threshold
        
        return best_gain_ratio, best_threshold
        
    """找出類別型特徵的最佳分割"""
    def _best_split_categorical(self, X: pd.DataFrame, y: np.ndarray, 
                                feature: str) -> Tuple[float, List]:
        
        values = X[feature].values
        unique_categories = np.unique(values)
        
        if len(unique_categories) <= 1:
            return 0.0, None
        
        # 對於類別型特徵，每個類別作為一個分支
        splits = [y[values == cat] for cat in unique_categories]
        
        # 檢查每個分支是否滿足最小葉節點樣本數
        if any(len(split) < self.min_samples_leaf for split in splits):
            return 0.0, None
        
        return self._gain_ratio(y, splits), list(unique_categories)
    
    """找出最佳分割特徵和分割點"""
    def _best_split(self, X: pd.DataFrame, y: np.ndarray) -> Tuple[str, Union[float, List], float]:
        
        best_gain_ratio = 0.0
        best_feature = None
        best_split_value = None
        
        # 遍歷所有特徵
        for feature in X.columns:
            if feature in self.categorical_features:
                gain_ratio, categories = self._best_split_categorical(X, y, feature)
            else:
                gain_ratio, categories = self._best_split_numerical(X, y, feature)
            
            if gain_ratio > best_gain_ratio:
                best_gain_ratio = gain_ratio
                best_feature = feature
                best_split_value = categories
        
        return best_feature, best_split_value, best_gain_ratio
        
    """遞迴建立決策樹"""
    def _build_tree(self, X: pd.DataFrame, y: np.ndarray, depth: int = 0) -> C45Node:
        
        node = C45Node()
        node.samples = len(y)
        node.class_distribution = dict(Counter(y))
        self.node_count += 1
        
        # 停止條件
        most_common = Counter(y).most_common(1)[0][0]
        
        if (len(np.unique(y)) == 1 or
            (self.max_depth is not None and depth >= self.max_depth) or
            len(y) < self.min_samples_split or
            (self.max_nodes is not None and self.node_count >= self.max_nodes)):
            node.is_leaf = True
            node.prediction = most_common
            return node
        
        # 找出最佳分割
        best_feature, best_split_value, best_gain_ratio = self._best_split(X, y)
        
        if best_feature is None or best_gain_ratio <= 0:
            node.is_leaf = True
            node.prediction = most_common
            return node
        
        # 建立內部節點
        node.feature = best_feature
        node.info_gain = best_gain_ratio
        
        # 根據特徵類型進行分割
        if best_feature in self.categorical_features:
            node.categories = best_split_value
            for category in best_split_value:
                mask = X[best_feature] == category
                if mask.sum() > 0:
                    node.children[category] = self._build_tree(X[mask], y[mask], depth + 1)
        else:
            node.threshold = best_split_value
            left_mask = X[best_feature] <= best_split_value
            
            if left_mask.sum() > 0:
                node.children['left'] = self._build_tree(X[left_mask], y[left_mask], depth + 1)
            if (~left_mask).sum() > 0:
                node.children['right'] = self._build_tree(X[~left_mask], y[~left_mask], depth + 1)
        
        return node
        
    """訓練決策樹"""
    def fit(self, X: pd.DataFrame, y: np.ndarray, 
            categorical_features: List[str], 
            numerical_features: List[str]):
        
        self.feature_names = list(X.columns)
        self.categorical_features = categorical_features
        self.numerical_features = numerical_features
        self.node_count = 0
        
        print("\n開始訓練 C4.5 決策樹...")
        self.root = self._build_tree(X, y)
        print(f"訓練完成：總節點數: {self.node_count}")
    
    """預測單一樣本"""
    def _predict_sample(self, x: pd.Series, node: C45Node) -> int:
        
        if node.is_leaf:
            return node.prediction
        
        feature_value = x[node.feature]
        
        if node.feature in self.categorical_features:
            if feature_value in node.children:
                return self._predict_sample(x, node.children[feature_value])
            return Counter(node.class_distribution).most_common(1)[0][0]
        else:
            try:
                if float(feature_value) <= float(node.threshold):
                    if 'left' in node.children:
                        return self._predict_sample(x, node.children['left'])
                else:
                    if 'right' in node.children:
                        return self._predict_sample(x, node.children['right'])
            except (TypeError, ValueError):
                pass
            
            return Counter(node.class_distribution).most_common(1)[0][0]
    
    """預測多個樣本"""
    def predict(self, X: pd.DataFrame) -> np.ndarray:
        
        if self.root is None:
            raise ValueError("模型尚未訓練，請先呼叫 fit() 方法")
        
        return np.array([self._predict_sample(row, self.root) for _, row in X.iterrows()])

# 4.主程式以及評估模型 

def evaluate_model(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]:
    """評估模型性能"""
    tp = np.sum((y_true == 1) & (y_pred == 1))
    tn = np.sum((y_true == 0) & (y_pred == 0))
    fp = np.sum((y_true == 0) & (y_pred == 1))
    fn = np.sum((y_true == 1) & (y_pred == 0))
    
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'true_positive': tp,
        'true_negative': tn,
        'false_positive': fp,
        'false_negative': fn
    }

def export_to_excel(y_true: np.ndarray, y_pred: np.ndarray, filename: str = 'predictions.xlsx'):
    """將預測結果匯出到 Excel 檔案"""
    wb = Workbook()
    ws = wb.active
    ws.title = "Prediction Results"
    
    # 設定標題
    ws['A1'] = '實際類別'
    ws['B1'] = '預測類別'
    ws['A1'].fill = PatternFill(start_color="FFD700", end_color="FFD700", fill_type="solid")
    ws['B1'].fill = PatternFill(start_color="FFD700", end_color="FFD700", fill_type="solid")
    
    # 定義顏色
    green_fill = PatternFill(start_color="90EE90", end_color="90EE90", fill_type="solid")
    red_fill = PatternFill(start_color="FFB6C6", end_color="FFB6C6", fill_type="solid")
    
    # 標籤映射
    label_map = {0: '<=50K', 1: '>50K'}
    
    # 寫入資料
    for i, (true_val, pred_val) in enumerate(zip(y_true, y_pred), start=2):
        ws[f'A{i}'] = label_map[true_val]
        ws[f'B{i}'] = label_map[pred_val]
        
        # 根據預測是否正確標示顏色
        fill = green_fill if true_val == pred_val else red_fill
        ws[f'A{i}'].fill = fill
        ws[f'B{i}'].fill = fill
    
    wb.save(filename)
    print(f"\n預測結果已匯出至: {filename}")

def print_metrics(metrics,title=""):
    """列印評估指標"""
    print("\n" + "=" * 50)
    if title:
        print(title.center(40))
    print("=" * 50)
    print(f"  {'準確率 (Accuracy)':<25}: {metrics['accuracy'] * 100:.2f}%")
    print(f"  {'精確度 (Precision)':<25}: {metrics['precision'] * 100:.2f}%")
    print(f"  {'召回率 (Recall)':<25}: {metrics['recall'] * 100:.2f}%")
    print(f"  {'F1-Score':<25}: {metrics['f1_score'] * 100:.2f}%")
    print("\n--- Confusion Matrix ---".center(50))
    print(f"  {'真陽性 (True Positive, TP)':<25}: {metrics['true_positive']:<10}")
    print(f"  {'真陰性 (True Negative, TN)':<25}: {metrics['true_negative']:<10}")
    print(f"  {'偽陽性 (False Positive, FP)':<25}: {metrics['false_positive']:<10}")
    print(f"  {'偽陰性 (False Negative, FN)':<25}: {metrics['false_negative']:<10}")

def main():    
    # 設定資料集路徑
    TRAIN_PATH = 'adult/adult.data'
    TEST_PATH = 'adult/adult.test'
    
    # 載入並預處理資料
    preprocessor = AdultDataPreprocessor(TRAIN_PATH, TEST_PATH)
    train_df = preprocessor.preprocess_train()
    test_df = preprocessor.preprocess_test()
    
    # 分離特徵和目標變數
    X_train = train_df.drop('income', axis=1)
    y_train = train_df['income'].values
    X_test = test_df.drop('income', axis=1)
    y_test = test_df['income'].values
    
    print("\n" + "=" * 60)
    print("資料集資訊")
    print("=" * 60)
    print(f"訓練集樣本數: {len(X_train)}")
    print(f"測試集樣本數: {len(X_test)}")
    print(f"特徵數量: {X_train.shape[1]}")
    
    # 格式化類別分布的輸出
    label_map_display = {0: '<=50K', 1: '>50K'}
    train_dist = {label_map_display.get(k, k): v for k, v in Counter(y_train).items()}
    test_dist = {label_map_display.get(k, k): v for k, v in Counter(y_test).items()}
    print(f"類別分布 (訓練集): {train_dist}")
    print(f"類別分布 (測試集): {test_dist}")

    # C4.5 模型的超參數
    # 這些參數用於預剪枝 (Pre-pruning)
    params = {
        'max_depth':20,
        'min_samples_split': 700,
        'min_samples_leaf': 50
    }
    
    # 訓練模型
    model = C45DecisionTree(**params)
    model.fit(
        X_train, 
        y_train,
        categorical_features=preprocessor.categorical_features,
        numerical_features=preprocessor.numerical_features
    )
    
    # 預測
    y_pred_train = model.predict(X_train)
    y_pred_test = model.predict(X_test)
    
    # 評估並列印結果
    train_metrics = evaluate_model(y_train, y_pred_train)
    print_metrics(train_metrics, "訓練資料性能指標")
    
    test_metrics = evaluate_model(y_test, y_pred_test)
    print_metrics(test_metrics, "測試資料性能指標")
    
    # 匯出結果到 Excel
    export_to_excel(y_test, y_pred_test, 'adult_predictions_C45.xlsx')

if __name__ == "__main__":
    main()

處理訓練資料...
移除 24 筆重複資料
欄位 workclass 有缺失值，使用眾數 'Private' 填補
欄位 occupation 有缺失值，使用眾數 'Prof-specialty' 填補
欄位 native-country 有缺失值，使用眾數 'United-States' 填補
訓練資料筆數: 32537
特徵數量: 14

處理測試資料...
移除 5 筆重複資料
欄位 workclass 有缺失值，使用眾數 'Private' 填補
欄位 occupation 有缺失值，使用眾數 'Prof-specialty' 填補
欄位 native-country 有缺失值，使用眾數 'United-States' 填補
測試資料筆數: 16276

資料集資訊
訓練集樣本數: 32537
測試集樣本數: 16276
特徵數量: 14
類別分布 (訓練集): {'<=50K': 24698, '>50K': 7839}
類別分布 (測試集): {'<=50K': 12430, '>50K': 3846}

開始訓練 C4.5 決策樹...
訓練完成：總節點數: 199

                訓練資料性能指標                
  準確率 (Accuracy)           : 85.39%
  精確度 (Precision)          : 73.76%
  召回率 (Recall)             : 61.09%
  F1-Score                 : 66.83%
            
--- Confusion Matrix ---             
  真陽性 (True Positive, TP)  : 4789      
  真陰性 (True Negative, TN)  : 22994     
  偽陽性 (False Positive, FP) : 1704      
  偽陰性 (False Negative, FN) : 3050      

                測試資料性能指標                
  準確率 (Accuracy)           : 85.22%
  精確度 (Precision)          