In [None]:
# %%
import csv
import os
import cv2
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import seaborn as sns
from PIL import Image 
from pathlib import Path
import visualkeras
import mlflow
import mlflow.keras
from tensorflow.keras.applications import mobilenet_v3, efficientnet
import itertools 
from sklearn.utils.class_weight import compute_class_weight
import keras_cv


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import regularizers, layers, models
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau,ModelCheckpoint 
from tensorflow.keras import optimizers
from tensorflow.keras import regularizers

# from keras.optimizers import Adam
# from keras.regularizers import l2
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import warnings 

warnings.filterwarnings('ignore')

In [None]:
# %%
# 1. 建立模型的函式 (最終穩健版：繞過 Keras 3 的 input_tensor Bug)
def build_model(model_name, learning_rate, dropout_rate1, dropout_rate2, dense_units, trainable_layers, num_classes, 
                freeze_base_model=False, loss_function='categorical_crossentropy'):
    """
    根據傳入的超參數建立並編譯指定的 Keras 模型。
    *** 修正：改變模型構建方式，以避開 Keras 3 在使用 input_tensor 時的 shape inference bug ***
    """
    input_shape = (224, 224, 3) 
    
    # --- 步驟 1：像之前一樣，定義輸入和數據增強層 ---
    inputs = keras.Input(shape=input_shape)

    
    data_augmentation = keras.Sequential(
        [
            layers.RandomFlip("horizontal"),
            layers.RandomRotation(0.15),
            layers.RandomTranslation(height_factor=0.1, width_factor=0.1),
            layers.RandomErasing(scale=(0.02, 0.1), value_range=[0, 255]),
        ],
        name="data_augmentation",
    )

    # --- 步驟 2：關鍵修改 - 先獨立創建基礎模型，不傳入 input_tensor ---
    if model_name == 'MobileNetV3-Large':
        # 注意：我們不再使用 input_tensor 參數
        base_model = keras.applications.MobileNetV3Large(
            input_shape=input_shape, include_top=False, weights='imagenet'
        )
    elif model_name == 'EfficientNet-B0':
        # 注意：我們不再使用 input_tensor 參數
        base_model = keras.applications.EfficientNetB0(
            input_shape=input_shape, include_top=False, weights='imagenet'
        )
    else:
        raise ValueError(f"不支援的模型名稱: {model_name}。")
        
    # --- 步驟 3：設定基礎模型的可訓練性 ---
    if freeze_base_model:
        base_model.trainable = False
    else:
        base_model.trainable = True

    # --- 步驟 4：手動連接計算圖 (Graph) ---
    # 1. 原始輸入 -> 數據增強
    x = data_augmentation(inputs)
    
    # 2. 數據增強的輸出 -> 基礎模型
    #    在凍結階段，BN層應在推斷模式下運行，所以 training=False
    x = base_model(x, training=False) 
    
    # 3. 基礎模型的輸出 -> 全局池化和分類頭
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(dense_units, activation='relu')(x)
    x = layers.Dropout(dropout_rate1)(x)
    x = layers.Dense(dense_units, activation='relu')(x)
    x = layers.Dropout(dropout_rate2)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    # --- 步驟 5：創建最終模型 ---
    model = Model(inputs=inputs, outputs=outputs)
    
    # --- 步驟 6：編譯模型 ---
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss="categorical_crossentropy", 
        metrics=['accuracy']
    )
    
    return model, base_model

In [70]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import mlflow
from sklearn.metrics import (
    confusion_matrix, 
    classification_report,
    precision_recall_curve, 
    average_precision_score, 
    roc_curve, 
    auc
)
from itertools import cycle

# 1. MLflow 視覺化與日誌記錄輔助函式 (維持不變)
def plot_and_log_history(history, filename="history_plots.png"):
    """
    繪製準確率和損失函數的歷史曲線，並將其儲存為圖片，最後記錄到 MLflow。
    """
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # 準確率圖
    ax1.plot(history.history['accuracy'], label='Train Accuracy')
    ax1.plot(history.history['val_accuracy'], label='Val Accuracy')
    ax1.set_ylim(0, 1)
    ax1.set_title('Accuracy over epochs')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy')
    ax1.legend()
    ax1.grid(True)
    
    # 損失函數圖
    ax2.plot(history.history['loss'], label='Train Loss')
    ax2.plot(history.history['val_loss'], label='Val Loss')
    ax2.set_title('Loss over epochs')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend()
    ax2.grid(True)
    
    # 儲存圖片並關閉繪圖
    plt.tight_layout()
    plt.savefig(filename)
    plt.close(fig)
    
    # 將圖片記錄到 MLflow
    mlflow.log_artifact(filename)
    print(f"✅ '{filename}' has been logged to MLflow.")

def log_confusion_matrix(y_true, y_pred, class_labels, filename="confusion_matrix.png"):
    """ 繪製、儲存並記錄標準混淆矩陣 """
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(18, 14))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_labels, yticklabels=class_labels, cmap='Blues')
    plt.xlabel('Predicted Class')
    plt.ylabel('True Class')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.savefig(filename)
    plt.close()
    mlflow.log_artifact(filename)
    print(f"✅ '{filename}' has been logged to MLflow.")

def plot_and_log_sorted_confusion_bar(y_true, y_pred, class_labels, filename="sorted_correct_counts.png"):
    """ 計算、繪製並記錄一個長條圖，顯示每個類別的正確預測數量（由高到低排序）。 """
    cm = confusion_matrix(y_true, y_pred)
    correct_counts = np.diag(cm)
    
    sorted_idx = np.argsort(correct_counts)[::-1]
    sorted_counts = correct_counts[sorted_idx]
    sorted_labels = np.array(class_labels)[sorted_idx]

    plt.figure(figsize=(18, 8))
    plt.bar(range(len(sorted_counts)), sorted_counts, color='lightgreen')
    plt.xticks(range(len(sorted_labels)), sorted_labels, rotation=90)
    plt.xlabel('Class')
    plt.ylabel('Correctly Predicted Count')
    plt.title('Correctly Predicted Count per Class (sorted)')
    plt.tight_layout()
    
    plt.savefig(filename)
    plt.close()

    mlflow.log_artifact(filename)
    print(f"✅ '{filename}' has been logged to MLflow.")

# 2. 【已修正】的 PR 和 ROC 曲線函式
def plot_and_log_precision_recall_curves(y_true, y_probas, class_labels, filename="precision_recall_curves.png"):
    """
    為每個類別繪製 Precision-Recall 曲線。
    特別突顯表現最差的5個類別，並將圖表記錄到 MLflow。
    """
    # 步驟 1: 計算每個類別的 Average Precision (AP) 分數
    class_aps = []
    for i in range(len(class_labels)):
        ap = average_precision_score(y_true == i, y_probas[:, i])
        class_aps.append((ap, i))

    # 步驟 2: 根據 AP 分數進行排序（由低到高），找出最差的5個
    class_aps.sort(key=lambda x: x[0])
    worst_classes_indices = {idx for _, idx in class_aps[:5]}

    # 步驟 3: 繪圖
    plt.figure(figsize=(15, 10))
    colors = cycle(['navy', 'turquoise', 'darkorange', 'cornflowerblue', 'teal']) # 為最差的5個類別準備顏色

    # 繪製所有類別的曲線
    for i, class_label in enumerate(class_labels):
        precision, recall, _ = precision_recall_curve(y_true == i, y_probas[:, i])
        ap_score = class_aps[i][0] if class_aps[i][1] == i else [s for s, j in class_aps if j == i][0] # 找到對應的AP

        if i in worst_classes_indices:
            # 如果是表現最差的類別之一，使用彩色線條並加入圖例
            plt.plot(recall, precision, lw=2.5, color=next(colors),
                     label=f'PR curve for {class_label} (AP = {ap_score:0.3f})')
        else:
            # 其他類別使用灰色線條，不加入圖例
            plt.plot(recall, precision, lw=1.5, color='lightgray')

    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title("Multi-class Precision-Recall Curve (Highlighting 5 worst classes)")
    plt.legend(loc="best") # 顯示圖例
    plt.grid(True)
    plt.tight_layout()
    
    plt.savefig(filename)
    plt.close()
    mlflow.log_artifact(filename)
    print(f"✅ '{filename}' has been logged to MLflow.")

def plot_and_log_roc_curves(y_true, y_probas, class_labels, filename="roc_curves.png"):
    """
    為每個類別繪製 ROC 曲線。
    特別突顯表現最差的5個類別，並將圖表記錄到 MLflow。
    """
    # 步驟 1: 計算每個類別的 AUC 分數
    class_aucs = []
    for i in range(len(class_labels)):
        fpr, tpr, _ = roc_curve(y_true == i, y_probas[:, i])
        roc_auc = auc(fpr, tpr)
        class_aucs.append((roc_auc, i))
    
    # 步驟 2: 根據 AUC 分數進行排序（由低到高），找出最差的5個
    class_aucs.sort(key=lambda x: x[0])
    worst_classes_indices = {idx for _, idx in class_aucs[:5]}
    
    # 步驟 3: 繪圖
    plt.figure(figsize=(15, 10))
    colors = cycle(['navy', 'turquoise', 'darkorange', 'cornflowerblue', 'teal']) # 為最差的5個類別準備顏色

    # 繪製所有類別的曲線
    for i, class_label in enumerate(class_labels):
        fpr, tpr, _ = roc_curve(y_true == i, y_probas[:, i])
        roc_auc = auc(fpr, tpr)
        
        if i in worst_classes_indices:
            # 如果是表現最差的類別之一，使用彩色線條並加入圖例
            plt.plot(fpr, tpr, lw=2.5, color=next(colors),
                     label=f'ROC curve for {class_label} (AUC = {roc_auc:0.3f})')
        else:
            # 其他類別使用灰色線條，不加入圖例
            plt.plot(fpr, tpr, lw=1.5, color='lightgray')

    plt.plot([0, 1], [0, 1], 'k--', lw=2, label='Random Guess')
    
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("Multi-class ROC Curve (Highlighting 5 worst classes)")
    plt.legend(loc="lower right") # 顯示圖例
    plt.grid(True)
    plt.tight_layout()
    
    plt.savefig(filename)
    plt.close()
    mlflow.log_artifact(filename)
    print(f"✅ '{filename}' has been logged to MLflow.")


# 3. 更新後的主評估函式
def evaluate_and_log_all_reports(model, data_generator, class_labels):
    """
    評估模型並記錄分類報告、混淆矩陣及其他分析圖（包括 PR 和 ROC 曲線）到 MLflow。
    """
    print("\n" + "="*20 + " Starting Full Evaluation for Logging " + "="*20)

    data_generator.reset()

    y_probas = model.predict(data_generator, steps=len(data_generator), verbose=1)
    y_pred = np.argmax(y_probas, axis=1)
    y_true = data_generator.classes
    
    if len(y_pred) != len(y_true):
        print(f"⚠️ y_pred length {len(y_pred)} vs y_true length {len(y_true)}. This is normal if batch_size doesn't divide total samples.")
        y_true = y_true[:len(y_pred)]

    report = classification_report(y_true, y_pred, target_names=class_labels, digits=4, zero_division=0)
    print("\nClassification Report:\n", report)
    mlflow.log_text(report, "classification_report.txt")
    print("✅ 'classification_report.txt' has been logged to MLflow.")

    log_confusion_matrix(y_true, y_pred, class_labels)
    plot_and_log_sorted_confusion_bar(y_true, y_pred, class_labels)
    plot_and_log_precision_recall_curves(y_true, y_probas, class_labels)
    plot_and_log_roc_curves(y_true, y_probas, class_labels)
    
    print("="*20 + " Full Evaluation Logging Complete " + "="*20 + "\n")

### 兩種模型

In [73]:
# %%
# 2. 自動訓練與調參的主函式 (全面優化版)
def train_and_tune(train_path, validation_path, target_accuracy):
    """
    自動執行兩階段訓練和超參數調整。
    實作了多項遷移學習最佳實踐。
    """
    param_space = {
        'model_name': [
            'MobileNetV3-Large', 
            # 'EfficientNet-B0'
        ],
        'learning_rate': [0.001],
        'dropout_rate1': [0.5],
        'dropout_rate2': [0.3],
        'dense_units': [256],
        'trainable_layers': [100],

        # 'learning_rate': [0.001],
        # 'dropout_rate': [0.3],
        # 'dense_units': [256],
        # 'trainable_layers': [80]
    }
    
    STAGE_1_EPOCHS = 30
    STAGE_2_EPOCHS = 50
    FINE_TUNE_LR_MULTIPLIER = 0.1
    
    keys, values = zip(*param_space.items())
    param_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]
    
    best_val_accuracy = 0
    best_run_id = None

    mlflow.set_experiment("MobileNetV3新2階段 RandomErasing 60種(資料夾7)")
    
    for i, params in enumerate(param_combinations):

        if best_val_accuracy >= target_accuracy:
            print(f"🎯 目標已達成 (Accuracy >= {target_accuracy:.2f})。停止搜尋。")
            break

        print("\n" + "=" * 70)
        print(f"🚀 Run {i+1}/{len(param_combinations)}: 嘗試參數組合: {params}")

        if params['model_name'] == 'MobileNetV3-Large':
            preprocessing_function = mobilenet_v3.preprocess_input
        elif params['model_name'] == 'EfficientNet-B0':
            preprocessing_function = efficientnet.preprocess_input
        else:
            raise ValueError(f"未知的模型: {params['model_name']}")

        # train_datagen_aug = ImageDataGenerator(
        #     preprocessing_function=preprocessing_function,
        #     rotation_range=15,
        #     width_shift_range=0.1,
        #     height_shift_range=0.1,
        #     zoom_range=0.1,
        #     horizontal_flip=True,
        #     # ### 優化 2：為避免數據分布偏移，建議優先移除亮度增強 ###
        #     # brightness_range=[0.9, 1.1],
        #     fill_mode='nearest'
        # )

        train_datagen_simple  = ImageDataGenerator(preprocessing_function=preprocessing_function)

        validation_datagen = ImageDataGenerator(preprocessing_function=preprocessing_function)

        train_gen = train_datagen_simple.flow_from_directory(
            train_path, target_size=(224, 224), batch_size=32, class_mode='categorical', shuffle=True
        )
        val_gen = validation_datagen.flow_from_directory(
            validation_path, target_size=(224, 224), batch_size=32, class_mode='categorical', shuffle=False
        )

        num_classes = train_gen.num_classes
        class_labels = list(train_gen.class_indices.keys())

        with open('classes_new.csv', 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            for cls in class_labels:
                writer.writerow([cls])
        
        class_indices = np.unique(train_gen.classes)
        class_weights_array = compute_class_weight('balanced', classes=class_indices, y=train_gen.classes)
        class_weight = dict(zip(class_indices, class_weights_array))

        with mlflow.start_run(run_name=f"{params['model_name']}_run_{i+1}") as run:
            mlflow.log_params(params)

            # STAGE 1: Feature Extraction
            print("\n" + "-" * 20 + " STAGE 1: Feature Extraction " + "-" * 20)
            
            # ### 優化 1：接收 model 和 base_model ###
            model, base_model = build_model(
                **params, num_classes=num_classes, freeze_base_model=True
            )
            
            history_stage1 = model.fit(
                train_gen, epochs=STAGE_1_EPOCHS, validation_data=val_gen,
                callbacks=[EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True)],
                class_weight=class_weight, verbose=1
            )
            
            # STAGE 2: Fine-Tuning
            print("\n" + "-" * 20 + " STAGE 2: Fine-Tuning " + "-" * 20)

            # --- 直接在現有 model 物件上操作 ---
            base_model.trainable = True
            
            trainable_layers_count = params['trainable_layers']
            if trainable_layers_count > 0 and trainable_layers_count < len(base_model.layers):
                print(f"🔥 Fine-tuning: Unfreezing the last {trainable_layers_count} layers.")
                for layer in base_model.layers[:-trainable_layers_count]:
                    layer.trainable = False
            else:
                print("🔥 Fine-tuning: Unfreezing all base model layers.")
            
            # ### 優化 3a：在微調時凍結 BatchNormalization 層 ###
            for layer in base_model.layers:
                if isinstance(layer, layers.BatchNormalization):
                    layer.trainable = False
            print("🧊 All BatchNormalization layers in the base model have been frozen.")

            # 用更低的學習率重新編譯模型
            fine_tune_lr = params['learning_rate'] * FINE_TUNE_LR_MULTIPLIER
            model.compile(
                optimizer=tf.keras.optimizers.Adam(learning_rate=fine_tune_lr),
                loss="categorical_crossentropy",
                metrics=['accuracy']
            )
            
            callbacks_stage2 = [
                EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
                ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-9),
                ModelCheckpoint("model_checkpoint.keras", monitor="val_loss", save_best_only=True)
            ]
            history_stage2 = model.fit(
                train_gen, epochs=STAGE_2_EPOCHS, validation_data=val_gen,
                class_weight=class_weight, callbacks=callbacks_stage2, verbose=1
            )
            
            # --- MLflow 記錄 ---
            full_history = {}
            for key in history_stage1.history.keys():
                full_history[key] = history_stage1.history[key] + history_stage2.history[key]
            
            # ### 優化 3b：計算整個訓練過程中的最佳驗證準確率 ###
            # 確保 history 字典不是空的
            best_s1_acc = max(history_stage1.history.get('val_accuracy', [0]))
            best_s2_acc = max(history_stage2.history.get('val_accuracy', [0]))
            current_run_best_val_accuracy = max(best_s1_acc, best_s2_acc)
            
            mlflow.log_metric("best_val_accuracy", current_run_best_val_accuracy)
            
            plot_and_log_history(type('History', (), {'history': full_history})())
            evaluate_and_log_all_reports(model, val_gen, class_labels)
            mlflow.keras.log_model(model, "model")
            
            print(f"✔️ Run {run.info.run_id} 完成。本輪最佳驗證準確率: {current_run_best_val_accuracy:.4f}")
            
            if current_run_best_val_accuracy > best_val_accuracy:
                best_val_accuracy = current_run_best_val_accuracy
                best_run_id = run.info.run_id
                print(f"🎉 新的最佳模型! Model: {params['model_name']}, Run ID: {best_run_id}, Accuracy: {best_val_accuracy:.4f}")
                model.save(f'checkpoints/bestmodel/{i+1}{params['model_name']}_{best_val_accuracy:.4f}.keras')
                print(f"💾 最佳模型已更新並儲存為 '{i+1}{params['model_name']}_{best_val_accuracy:.4f}.keras'")

    print("=" * 70)
    print("🏁 自動調參完成。")
    if best_run_id:
        print(f"🏆 最終最佳模型的 Run ID 為: {best_run_id}")
        print(f"🏆 最終最佳驗證準確率為: {best_val_accuracy:.4f}")
    else:
        print("❌ 未能成功訓練任何模型。")

In [74]:
# %%
# 3. 執行主流程 (已更新)

# --- 定義路徑 ---
train_path = "dataset_full_en_aug7_56_new/train"
validation_path = "dataset_full_en_aug7_56_new/validation"

# --- 啟動自動訓練與調參 ---
if __name__ == '__main__':
    # 函式內部會處理數據生成器、類別數量和權重計算
    train_and_tune(
        train_path=train_path,
        validation_path=validation_path,
        target_accuracy=0.98
    )

2025/08/24 22:36:33 INFO mlflow.tracking.fluent: Experiment with name 'MobileNetV3新2階段 RandomErasing 60種(資料夾7)' does not exist. Creating a new experiment.



🚀 Run 1/1: 嘗試參數組合: {'model_name': 'MobileNetV3-Large', 'learning_rate': 0.001, 'dropout_rate1': 0.5, 'dropout_rate2': 0.3, 'dense_units': 256, 'trainable_layers': 100}
Found 34279 images belonging to 56 classes.
Found 6720 images belonging to 56 classes.

-------------------- STAGE 1: Feature Extraction --------------------
Epoch 1/30
[1m1072/1072[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m467s[0m 431ms/step - accuracy: 0.4387 - loss: 1.9668 - val_accuracy: 0.7189 - val_loss: 0.9160
Epoch 2/30
[1m1072/1072[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m389s[0m 363ms/step - accuracy: 0.6077 - loss: 1.2930 - val_accuracy: 0.7574 - val_loss: 0.7851
Epoch 3/30
[1m1072/1072[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m393s[0m 367ms/step - accuracy: 0.6589 - loss: 1.1245 - val_accuracy: 0.7756 - val_loss: 0.7239
Epoch 4/30
[1m1072/1072[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m389s[0m 363ms/step - accuracy: 0.6807 - loss: 1.0444 - val_accuracy: 0.7923 - val_loss: 0.6970
E



✅ 'roc_curves.png' has been logged to MLflow.





✔️ Run b54019b8cb5b45148a17f75963967e55 完成。本輪最佳驗證準確率: 0.9179
🎉 新的最佳模型! Model: MobileNetV3-Large, Run ID: b54019b8cb5b45148a17f75963967e55, Accuracy: 0.9179
💾 最佳模型已更新並儲存為 '1MobileNetV3-Large_0.9179.keras'
🏁 自動調參完成。
🏆 最終最佳模型的 Run ID 為: b54019b8cb5b45148a17f75963967e55
🏆 最終最佳驗證準確率為: 0.9179
