<a href="https://colab.research.google.com/github/xin-kai08/Machine-Learning-Models/blob/main/%E6%A9%9F%E5%99%A8%E5%AD%B8%E7%BF%92.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
from google.colab import drive
import os

# 掛載 Google 雲端硬碟
drive.mount('/content/drive')

# 資料集根目錄
BASE_PATH = "/content/drive/MyDrive/Colab Notebooks/dataset"

# 各分類資料夾設定
LABEL_DIRS = {
    0: os.path.join(BASE_PATH, "normal"),
    1: os.path.join(BASE_PATH, "abnormal/transformer_rust"),
    2: os.path.join(BASE_PATH, "abnormal/wire_rust"),
    3: os.path.join(BASE_PATH, "abnormal/wire_peeling"),
}

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


定義快取函數(三維)

In [13]:
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

def generate_preprocessed_cache_3d(seq_lens, label_dirs, cache_dir="/content/preprocessed"):
    os.makedirs(cache_dir, exist_ok=True)

    for seq_len in seq_lens:
        cache_X = os.path.join(cache_dir, f"X_seq{seq_len}_3d.npy")
        cache_y = os.path.join(cache_dir, f"y_seq{seq_len}_3d.npy")

        if os.path.exists(cache_X) and os.path.exists(cache_y):
            print(f"📥 已存在 3D 快取：seq_len={seq_len}，略過")
            continue

        all_seq, all_labels = [], []
        for label, folder in label_dirs.items():
            for fname in os.listdir(folder):
                if fname.endswith(".csv"):
                    path = os.path.join(folder, fname)
                    df = pd.read_csv(path)
                    data = df[["voltage", "current", "power"]].values.astype(np.float32)
                    num_chunks = len(data) // seq_len
                    chunks = [data[i * seq_len : (i + 1) * seq_len] for i in range(num_chunks)]
                    all_seq.extend(chunks)
                    all_labels.extend([label] * len(chunks))

        seq_arr = np.array(all_seq, dtype=np.float32)
        labels_arr = np.array(all_labels, dtype=np.int64)
        B, T, F = seq_arr.shape
        reshaped = seq_arr.reshape(-1, F)
        scaled = StandardScaler().fit_transform(reshaped).reshape(B, T, F)

        np.save(cache_X, scaled)
        np.save(cache_y, labels_arr)
        print(f"✅ 完成 3D 快取：seq_len={seq_len}（共 {B} 筆序列）")

定義快取函數(二維)

In [14]:
def generate_preprocessed_cache_2d(seq_lens, label_dirs, cache_dir="/content/preprocessed"):
    os.makedirs(cache_dir, exist_ok=True)

    for seq_len in seq_lens:
        cache_X = os.path.join(cache_dir, f"X_seq{seq_len}_2d.npy")
        cache_y = os.path.join(cache_dir, f"y_seq{seq_len}_2d.npy")

        if os.path.exists(cache_X) and os.path.exists(cache_y):
            print(f"📥 已存在 2D 快取：seq_len={seq_len}，略過")
            continue

        all_features, all_labels = [], []
        for label, folder in label_dirs.items():
            for fname in os.listdir(folder):
                if fname.endswith(".csv"):
                    path = os.path.join(folder, fname)
                    df = pd.read_csv(path)
                    data = df[["voltage", "current", "power"]].values.astype(np.float32)
                    num_chunks = len(data) // seq_len
                    chunks = [data[i * seq_len : (i + 1) * seq_len] for i in range(num_chunks)]

                    for chunk in chunks:
                        features = []
                        features.extend(np.mean(chunk, axis=0))  # 3
                        features.extend(np.std(chunk, axis=0))   # 3
                        features.extend(np.max(chunk, axis=0))   # 3
                        features.extend(np.min(chunk, axis=0))   # 3
                        all_features.append(features)
                        all_labels.append(label)

        X = np.array(all_features, dtype=np.float32)
        y = np.array(all_labels, dtype=np.int64)

        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)

        np.save(cache_X, X_scaled)
        np.save(cache_y, y)
        print(f"✅ 完成 2D 快取：seq_len={seq_len}（共 {len(X_scaled)} 筆）")

實際執行快取

In [15]:
seq_lens = [4, 5, 8, 10]
generate_preprocessed_cache_3d(seq_lens, LABEL_DIRS)      # 產生 3D
generate_preprocessed_cache_2d(seq_lens, LABEL_DIRS)   # 產生 2D

📥 已存在 3D 快取：seq_len=4，略過
📥 已存在 3D 快取：seq_len=5，略過
📥 已存在 3D 快取：seq_len=8，略過
📥 已存在 3D 快取：seq_len=10，略過
📥 已存在 2D 快取：seq_len=4，略過
📥 已存在 2D 快取：seq_len=5，略過
📥 已存在 2D 快取：seq_len=8，略過
📥 已存在 2D 快取：seq_len=10，略過


LSTM

In [18]:
from google.colab import drive
import os
import numpy as np
import pandas as pd
import time
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# 掛載 Google Drive
drive.mount('/content/drive')

# === 資料夾與儲存路徑設定 ===
BASE_PATH = "/content/drive/MyDrive/Colab Notebooks/dataset"
LABEL_DIRS = {
    0: os.path.join(BASE_PATH, "normal"),
    1: os.path.join(BASE_PATH, "abnormal/transformer_rust"),
    2: os.path.join(BASE_PATH, "abnormal/wire_rust"),
    3: os.path.join(BASE_PATH, "abnormal/wire_peeling"),
}
RESULT_DIR = "./models/LSTM/result"
for sub in [
    "accuracy_curves",         # ✅ 每次 epoch 的平均準確率
    "loss_curves",             # ✅ 每次 epoch 的平均 loss
    "f1_score_curves",         # ✅ 每次 epoch 的 F1-score
    "confusion_matrices",      # ✅ 混淆矩陣圖
    "accuracy_train_vs_val",   # 🆕 Accuracy: Train vs Val 對照圖
    "loss_train_vs_val",       # 🆕 Loss: Train vs Val 對照圖
]:
    os.makedirs(os.path.join(RESULT_DIR, sub), exist_ok=True)

# === 自訂 Dataset ===
class ChargeSequenceDataset(Dataset):
    def __init__(self, sequences, labels):
        self.X = torch.tensor(sequences, dtype=torch.float32)
        self.y = torch.tensor(labels, dtype=torch.long)
    def __len__(self): return len(self.X)
    def __getitem__(self, idx): return self.X[idx], self.y[idx]

# === LSTM 模型 ===
class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_classes):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)
    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]
        return self.fc(out)

# === 資料處理 ===
def process_file(file_path, label, max_seq_len):
    df = pd.read_csv(file_path)

    # 只保留 voltage, current, power 三欄
    data = df[["voltage", "current", "power"]].values.astype(np.float32)

    # 用 max_seq_len 分段切割序列
    num_chunks = len(data) // max_seq_len
    chunks = [data[i * max_seq_len : (i + 1) * max_seq_len] for i in range(num_chunks)]

    return chunks, [label] * len(chunks)

def load_all_sequences(max_seq_len):
    all_seq, all_labels = [], []
    total_sequences = 0
    for label, folder in LABEL_DIRS.items():
        for fname in os.listdir(folder):
            if fname.endswith(".csv"):
                path = os.path.join(folder, fname)
                seqs, labels = process_file(path, label, max_seq_len)
                all_seq.extend(seqs)
                all_labels.extend(labels)
                total_sequences += len(seqs)

    seq_arr = np.array(all_seq, dtype=np.float32)
    labels_arr = np.array(all_labels, dtype=np.int64)
    B, T, F = seq_arr.shape
    reshaped = seq_arr.reshape(-1, F)
    scaled = StandardScaler().fit_transform(reshaped).reshape(B, T, F)

    print(f"📊 Total sequences loaded: {total_sequences}")

    return scaled, labels_arr

# === 主訓練與超參數搜尋函數（使用快取） ===
def train_and_search_lstm(batch_sizes, learning_rates, seq_lens,
                          num_epochs=100, hidden_dim=64, num_layers=1,
                          num_classes=4, k_folds=5):

    RESULT_DIR = "./models/LSTM/result"
    PREPROCESSED_DIR = "/content/preprocessed/"

    results = []
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    for bs in batch_sizes:
        for lr in learning_rates:
            for seq_len in seq_lens:
                print(f"\n🧪 BS={bs} | LR={lr} | SEQ={seq_len}")
                x_path = os.path.join(PREPROCESSED_DIR, f"X_seq{seq_len}_3d.npy")
                y_path = os.path.join(PREPROCESSED_DIR, f"y_seq{seq_len}_3d.npy")
                X = np.load(x_path)
                y = np.load(y_path)

                skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

                acc_curves, loss_curves, f1_curves = [], [], []
                acc_train_curves, loss_train_curves = [], []
                all_y_true, all_y_pred = [], []

                t0 = time.time()

                for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
                    X_train, X_val = X[train_idx], X[val_idx]
                    y_train, y_val = y[train_idx], y[val_idx]

                    train_loader = DataLoader(ChargeSequenceDataset(X_train, y_train), batch_size=bs, shuffle=True)
                    val_loader = DataLoader(ChargeSequenceDataset(X_val, y_val), batch_size=bs)

                    model = LSTMClassifier(input_dim=3, hidden_dim=hidden_dim,
                                           num_layers=num_layers, num_classes=num_classes).to(device)
                    optimizer = optim.Adam(model.parameters(), lr=lr)
                    criterion = nn.CrossEntropyLoss()

                    acc_list, loss_list, f1_list = [], [], []
                    acc_train_list, loss_train_list = [], []

                    for epoch in range(num_epochs):
                        # === Training ===
                        model.train()
                        total_loss_train, correct_train, total_train = 0, 0, 0
                        for xb, yb in train_loader:
                            xb, yb = xb.to(device), yb.to(device)
                            optimizer.zero_grad()
                            out = model(xb)
                            loss = criterion(out, yb)
                            loss.backward()
                            optimizer.step()

                            _, pred = torch.max(out, 1)
                            correct_train += (pred == yb).sum().item()
                            total_train += yb.size(0)
                            total_loss_train += loss.item() * xb.size(0)

                        acc_train = correct_train / total_train
                        loss_train = total_loss_train / total_train

                        acc_train_list.append(acc_train)
                        loss_train_list.append(loss_train)

                        # === Validation ===
                        model.eval()
                        correct, total, total_loss = 0, 0, 0
                        y_pred_epoch, y_true_epoch = [], []
                        with torch.no_grad():
                            for xb, yb in val_loader:
                                xb, yb = xb.to(device), yb.to(device)
                                out = model(xb)
                                loss = criterion(out, yb)
                                _, pred = torch.max(out, 1)
                                correct += (pred == yb).sum().item()
                                total += yb.size(0)
                                total_loss += loss.item() * xb.size(0)
                                y_pred_epoch.extend(pred.cpu().numpy())
                                y_true_epoch.extend(yb.cpu().numpy())

                        acc = correct / total
                        avg_loss = total_loss / total
                        f1 = f1_score(y_true_epoch, y_pred_epoch, average='macro', zero_division=0)

                        acc_list.append(acc)
                        loss_list.append(avg_loss)
                        f1_list.append(f1)

                        if (epoch + 1) % 10 == 0:
                            print(f"    [Fold {fold+1}] Epoch {epoch+1}/{num_epochs} | Acc: {acc:.4f} | Loss: {avg_loss:.4f}")

                    acc_curves.append(acc_list)
                    loss_curves.append(loss_list)
                    f1_curves.append(f1_list)
                    acc_train_curves.append(acc_train_list)
                    loss_train_curves.append(loss_train_list)
                    all_y_true.extend(y_true_epoch)
                    all_y_pred.extend(y_pred_epoch)

                t1 = time.time()

                def plot_metric(avg_list, metric_name, color, marker, folder):
                    plt.plot(range(1, num_epochs+1), avg_list, marker=marker, color=color)
                    plt.title(f"{metric_name.capitalize()} (BS={bs}, LR={lr}, SEQ={seq_len})")
                    plt.xlabel("Epoch")
                    plt.ylabel(metric_name.capitalize())
                    plt.ylim(0, 1.0 if metric_name != 'loss' else None)
                    plt.grid(True)
                    path = os.path.join(RESULT_DIR, folder, f"bs{bs}_lr{lr}_seq{seq_len}.png")
                    plt.savefig(path, bbox_inches='tight')
                    plt.close()

                # === Plot ===
                plot_metric(np.mean(acc_curves, axis=0), "accuracy", "black", "o", "accuracy_curves")
                plot_metric(np.mean(loss_curves, axis=0), "loss", "orange", "s", "loss_curves")
                plot_metric(np.mean(f1_curves, axis=0), "f1_score", "purple", "^", "f1_score_curves")
                plot_metric(np.mean(acc_train_curves, axis=0), "accuracy", "red", "o", "accuracy_train_vs_val")
                plot_metric(np.mean(loss_train_curves, axis=0), "loss", "blue", "s", "loss_train_vs_val")

                # 混淆矩陣
                cm = confusion_matrix(all_y_true, all_y_pred)
                disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=np.unique(y))
                disp.plot(cmap='Blues', values_format='d')
                plt.title(f"Confusion Matrix\nBS={bs} LR={lr} SEQ={seq_len}")
                plt.savefig(os.path.join(RESULT_DIR, "confusion_matrices", f"bs{bs}_lr{lr}_seq{seq_len}.png"), bbox_inches='tight')
                plt.close()

                final_result = {
                    'batch_size': bs, 'learning_rate': lr, 'seq_len': seq_len,
                    'final_acc': acc_list[-1], 'final_loss': loss_list[-1],
                    'precision': precision_score(all_y_true, all_y_pred, average='macro', zero_division=0),
                    'recall': recall_score(all_y_true, all_y_pred, average='macro', zero_division=0),
                    'f1_score': f1_score(all_y_true, all_y_pred, average='macro', zero_division=0),
                    'training_time_s': round(t1 - t0, 2)
                }
                results.append(final_result)

                print(f"\n📊 統計結果 (BS={bs}, LR={lr}, SEQ={seq_len}):")
                print(f"  🔹 Final Accuracy : {final_result['final_acc']:.4f}")
                print(f"  🔹 F1-score       : {final_result['f1_score']:.4f}")
                print(f"  ⏱️  Training Time  : {final_result['training_time_s']} 秒")

    df = pd.DataFrame(results)
    csv_path = os.path.join(RESULT_DIR, "lstm_experiment_results.csv")
    df.to_csv(csv_path, index=False)
    print(f"\n📄 All experiment results saved to {csv_path}")

    if not df.empty:
        best = df.loc[df['final_acc'].idxmax()]
        print(f"\n🏆 Best: BS={best['batch_size']} | LR={best['learning_rate']} | SEQ={best['seq_len']} | ACC={best['final_acc']:.4f}")
        return results, best
    else:
        print("\n❗ No valid results")
        return [], None

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# 設定訓練參數
batch_sizes = [4, 8, 16]
learning_rates = [1e-3, 1e-4]
seq_lens = [4, 5, 8, 10]
num_epochs = 100

# 顯示總組數
total_combinations = len(batch_sizes) * len(learning_rates) * len(seq_lens)
print(f"🔍 總共訓練組數：{total_combinations}，每組訓練 {num_epochs} epochs")

# 執行網格搜尋訓練
results, best = train_and_search_lstm(batch_sizes, learning_rates, seq_lens, num_epochs=num_epochs)

# 輸出最佳參數與指標
if best is not None:
    print("\n🎯 最佳參數組合：")
    print(f"Batch Size     = {best['batch_size']}")
    print(f"Learning Rate  = {best['learning_rate']}")
    print(f"Sequence Length= {best['seq_len']}")
    print(f"Final Accuracy = {best['final_acc']:.4f}")
    print(f"Final Loss     = {best['final_loss']:.4f}")
    print(f"Precision      = {best['precision']:.4f}")
    print(f"Recall         = {best['recall']:.4f}")
    print(f"F1-score       = {best['f1_score']:.4f}")
    print(f"Training Time  = {best['training_time_s']} 秒")
else:
    print("❗ 沒有有效結果（準確率全部為 1.0）")

🔍 總共訓練組數：24，每組訓練 100 epochs

🧪 BS=4 | LR=0.001 | SEQ=4
    [Fold 1] Epoch 10/100 | Acc: 0.9832 | Loss: 0.0545


MLP

In [None]:
import os
import numpy as np
import pandas as pd
import time
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# === 資料夾設定 ===
BASE_PATH = "/content/drive/MyDrive/Colab Notebooks/dataset"
LABEL_DIRS = {
    0: os.path.join(BASE_PATH, "normal"),
    1: os.path.join(BASE_PATH, "abnormal/transformer_rust"),
    2: os.path.join(BASE_PATH, "abnormal/wire_rust"),
    3: os.path.join(BASE_PATH, "abnormal/wire_peeling"),
}
RESULT_DIR = "./models/MLP/result"
for sub in ["accuracy_curves", "loss_curves", "confusion_matrices",
            "precision_curves", "recall_curves", "f1_score_curves"]:
    os.makedirs(os.path.join(RESULT_DIR, sub), exist_ok=True)

# === 自訂 Dataset ===
class ChargeSequenceDataset(Dataset):
    def __init__(self, sequences, labels):
        self.X = torch.tensor(sequences, dtype=torch.float32)
        self.y = torch.tensor(labels, dtype=torch.long)
    def __len__(self): return len(self.X)
    def __getitem__(self, idx): return self.X[idx], self.y[idx]

# === MLP 模型 ===
class MLPClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, num_classes)
        )
    def forward(self, x):
        return self.model(x)

# === 資料處理 ===
def process_file(file_path, label, max_seq_len):
    df = pd.read_csv(file_path)

    # ✅ 只保留 voltage, current, power 三欄
    data = df[["voltage", "current", "power"]].values.astype(np.float32)

    # 切割成固定長度序列
    num_chunks = len(data) // max_seq_len
    chunks = [data[i * max_seq_len : (i + 1) * max_seq_len] for i in range(num_chunks)]

    return chunks, [label] * len(chunks)

def load_all_sequences(max_seq_len):
    all_seq, all_labels = [], []

    for label, folder in LABEL_DIRS.items():
        for fname in os.listdir(folder):
            if fname.endswith(".csv"):
                path = os.path.join(folder, fname)
                seqs, labels = process_file(path, label, max_seq_len)
                all_seq.extend(seqs)
                all_labels.extend(labels)

    seq_arr = np.array(all_seq, dtype=np.float32)  # shape = (samples, seq_len, features)
    num_samples = seq_arr.shape[0]

    # ✅ 攤平成 2D：轉為 (samples, seq_len * features)
    seq_reshaped = seq_arr.reshape(num_samples, -1)

    # ✅ 標準化（只對 2D 資料做）
    scaled = StandardScaler().fit_transform(seq_reshaped)

    print(f"📊 Total sequences loaded: {num_samples}")

    return scaled, np.array(all_labels, dtype=np.int64)

# === 訓練與儲存 ===
def train_and_search_mlp(batch_sizes, learning_rates, seq_lens, num_epochs=100, hidden_dim=128, num_classes=4):
    results = []
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    for bs in batch_sizes:
        for lr in learning_rates:
            for seq_len in seq_lens:
                print(f"\n\U0001f9ea BS={bs} | LR={lr} | SEQ={seq_len}")
                X, y = load_all_sequences(seq_len)
                X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
                train_loader = DataLoader(ChargeSequenceDataset(X_train, y_train), batch_size=bs, shuffle=True)
                test_loader = DataLoader(ChargeSequenceDataset(X_test, y_test), batch_size=bs)
                input_dim = X_train.shape[1]
                model = MLPClassifier(input_dim, hidden_dim, num_classes).to(device)
                optimizer = optim.Adam(model.parameters(), lr=lr)
                criterion = nn.CrossEntropyLoss()

                acc_list, loss_list = [], []
                precision_list, recall_list, f1_list = [], [], []

                t0 = time.time()
                for epoch in range(num_epochs):
                    model.train()
                    for xb, yb in train_loader:
                        xb, yb = xb.to(device), yb.to(device)
                        optimizer.zero_grad()
                        out = model(xb)
                        loss = criterion(out, yb)
                        loss.backward()
                        optimizer.step()

                    # Evaluate
                    model.eval()
                    correct, total, total_loss = 0, 0, 0
                    y_pred, y_true = [], []
                    with torch.no_grad():
                        for xb, yb in test_loader:
                            xb, yb = xb.to(device), yb.to(device)
                            out = model(xb)
                            loss = criterion(out, yb)
                            _, pred = torch.max(out, 1)
                            correct += (pred == yb).sum().item()
                            total += yb.size(0)
                            total_loss += loss.item() * xb.size(0)
                            y_pred.extend(pred.cpu().numpy())
                            y_true.extend(yb.cpu().numpy())
                    acc = correct / total
                    avg_loss = total_loss / total
                    acc_list.append(acc)
                    loss_list.append(avg_loss)
                    precision_list.append(precision_score(y_true, y_pred, average='macro', zero_division=0))
                    recall_list.append(recall_score(y_true, y_pred, average='macro', zero_division=0))
                    f1_list.append(f1_score(y_true, y_pred, average='macro', zero_division=0))
                    print(f"Epoch {epoch+1}/{num_epochs} | Acc: {acc:.4f} | Loss: {avg_loss:.4f}")
                t1 = time.time()

                if acc_list[-1] >= 1.0:
                    print(f"\u26a0\ufe0f Skipped BS={bs} LR={lr} SEQ={seq_len} due to acc=1.0")
                    continue

                # 儲存折線圖
                def save_curve(data, ylabel, folder):
                    plt.plot(range(1, num_epochs+1), data, marker='o')
                    plt.title(f"{ylabel} (BS={bs}, LR={lr}, SEQ={seq_len})")
                    plt.xlabel("Epoch"); plt.ylabel(ylabel); plt.grid(True)
                    if ylabel == "Accuracy": plt.ylim(0, 1.0)
                    plt.savefig(os.path.join(RESULT_DIR, folder, f"bs{bs}_lr{lr}_seq{seq_len}.png"), bbox_inches='tight')
                    plt.close()

                save_curve(acc_list, "Accuracy", "accuracy_curves")
                save_curve(loss_list, "Loss", "loss_curves")
                save_curve(precision_list, "Precision", "precision_curves")
                save_curve(recall_list, "Recall", "recall_curves")
                save_curve(f1_list, "F1-score", "f1_score_curves")

                # 混淆矩陣圖
                disp = ConfusionMatrixDisplay(confusion_matrix=confusion_matrix(y_true, y_pred), display_labels=np.unique(y))
                disp.plot(cmap='Blues', values_format='d')
                plt.title(f"Confusion Matrix\nBS={bs} LR={lr} SEQ={seq_len}")
                plt.savefig(os.path.join(RESULT_DIR, "confusion_matrices", f"bs{bs}_lr{lr}_seq{seq_len}.png"), bbox_inches='tight')
                plt.close()

                results.append({
                    'batch_size': bs, 'learning_rate': lr, 'seq_len': seq_len,
                    'final_acc': acc_list[-1], 'final_loss': loss_list[-1],
                    'precision': precision_list[-1], 'recall': recall_list[-1], 'f1_score': f1_list[-1],
                    'training_time_s': round(t1 - t0, 2)
                })

    df = pd.DataFrame(results)
    csv_path = os.path.join(RESULT_DIR, "mlp_experiment_results.csv")
    df.to_csv(csv_path, index=False)
    print(f"\n\U0001f4dc All experiment results saved to {csv_path}")

    if not df.empty:
        best = df.loc[df['final_acc'].idxmax()]
        print(f"\n\U0001f3c6 Best: BS={best['batch_size']} | LR={best['learning_rate']} | SEQ={best['seq_len']} | ACC={best['final_acc']:.4f}")
        return results, best
    else:
        print("\n❗ No valid results (all accuracy = 1.0 were skipped)")
        return [], None

In [None]:
# 設定訓練參數
batch_sizes = [4, 8, 16]
learning_rates = [1e-3, 1e-4]
seq_lens = [4, 8, 10]
num_epochs = 100

# 顯示總組數
total_combinations = len(batch_sizes) * len(learning_rates) * len(seq_lens)
print(f"🔍 總共訓練組數：{total_combinations}，每組訓練 {num_epochs} epochs")

# 執行網格搜尋訓練
results, best = train_and_search_mlp(batch_sizes, learning_rates, seq_lens, num_epochs=num_epochs)

# 輸出最佳參數與指標
if best is not None:
    print("\n🎯 最佳參數組合：")
    print(f"Batch Size     = {best['batch_size']}")
    print(f"Learning Rate  = {best['learning_rate']}")
    print(f"Sequence Length= {best['seq_len']}")
    print(f"Final Accuracy = {best['final_acc']:.4f}")
    print(f"Final Loss     = {best['final_loss']:.4f}")
    print(f"Precision      = {best['precision']:.4f}")
    print(f"Recall         = {best['recall']:.4f}")
    print(f"F1-score       = {best['f1_score']:.4f}")
    print(f"Training Time  = {best['training_time_s']} 秒")
else:
    print("❗ 沒有有效結果（準確率全部為 1.0）")


🔍 總共訓練組數：18，每組訓練 100 epochs

🧪 BS=4 | LR=0.001 | SEQ=4
📊 Total sequences loaded: 10023
Epoch 1/100 | Acc: 0.9776 | Loss: 0.1761
Epoch 2/100 | Acc: 0.9810 | Loss: 0.1027
Epoch 3/100 | Acc: 0.9830 | Loss: 0.0764
Epoch 4/100 | Acc: 0.9880 | Loss: 0.0567
Epoch 5/100 | Acc: 0.9895 | Loss: 0.0498
Epoch 6/100 | Acc: 0.9885 | Loss: 0.0496
Epoch 7/100 | Acc: 0.9890 | Loss: 0.0463
Epoch 8/100 | Acc: 0.9875 | Loss: 0.0476
Epoch 9/100 | Acc: 0.9890 | Loss: 0.0497
Epoch 10/100 | Acc: 0.9905 | Loss: 0.0407
Epoch 11/100 | Acc: 0.9875 | Loss: 0.0501
Epoch 12/100 | Acc: 0.9900 | Loss: 0.0406
Epoch 13/100 | Acc: 0.9905 | Loss: 0.0386
Epoch 14/100 | Acc: 0.9910 | Loss: 0.0374
Epoch 15/100 | Acc: 0.9900 | Loss: 0.0392
Epoch 16/100 | Acc: 0.9915 | Loss: 0.0366
Epoch 17/100 | Acc: 0.9925 | Loss: 0.0370
Epoch 18/100 | Acc: 0.9880 | Loss: 0.0458
Epoch 19/100 | Acc: 0.9920 | Loss: 0.0345
Epoch 20/100 | Acc: 0.9915 | Loss: 0.0382
Epoch 21/100 | Acc: 0.9935 | Loss: 0.0319
Epoch 22/100 | Acc: 0.9930 | Loss: 0.035

SVM

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# === 路徑設定 ===
BASE_PATH = "/content/drive/MyDrive/Colab Notebooks/dataset"
LABEL_DIRS = {
    0: os.path.join(BASE_PATH, "normal"),
    1: os.path.join(BASE_PATH, "abnormal/transformer_rust"),
    2: os.path.join(BASE_PATH, "abnormal/wire_rust"),
    3: os.path.join(BASE_PATH, "abnormal/wire_peeling"),
}
RESULT_DIR = "./models/SVM/result"
os.makedirs(os.path.join(RESULT_DIR, "confusion_matrices"), exist_ok=True)

# === 資料處理 ===
def process_file(file_path, label, max_seq_len):
    df = pd.read_csv(file_path)

    # 只保留 voltage, current, power 三欄
    data = df[["voltage", "current", "power"]].values.astype(np.float32)

    # 用 max_seq_len 分段切割序列
    num_chunks = len(data) // max_seq_len
    chunks = [data[i * max_seq_len : (i + 1) * max_seq_len] for i in range(num_chunks)]

    return chunks, [label] * len(chunks)

def load_all_sequences(max_seq_len):
    all_seq, all_labels = [], []
    for label, folder in LABEL_DIRS.items():
        for fname in os.listdir(folder):
            if fname.endswith(".csv"):
                path = os.path.join(folder, fname)
                seqs, labels = process_file(path, label, max_seq_len)
                all_seq.extend(seqs)
                all_labels.extend(labels)

    seq_arr = np.array(all_seq, dtype=np.float32)  # shape = (samples, seq_len, features)
    num_samples = seq_arr.shape[0]

    # 攤平成 2D：變成 (samples, seq_len * features)
    seq_reshaped = seq_arr.reshape(num_samples, -1)

    # 標準化
    scaled = StandardScaler().fit_transform(seq_reshaped)

    print(f"📊 Total sequences loaded: {num_samples}")

    return scaled, np.array(all_labels, dtype=np.int64)

# === 訓練與測試 SVM ===
import time
def train_svm_once(kernels, Cs, seq_lens):
    results = []

    for kernel in kernels:
        for C_val in Cs:
            for seq_len in seq_lens:
                print(f"\n🔎 Kernel={kernel} | C={C_val} | SEQ={seq_len}")
                X, y = load_all_sequences(seq_len)
                X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

                t0 = time.time()
                clf = SVC(kernel=kernel, C=C_val)
                clf.fit(X_train, y_train)
                t1 = time.time()

                y_pred = clf.predict(X_test)
                acc = accuracy_score(y_test, y_pred)

                # ⚠️ 跳過準確率為 1.0 的情況
                if acc >= 1.0:
                    print(f"⚠️ Skipped Kernel={kernel} C={C_val} SEQ={seq_len} due to acc=1.0")
                    continue

                precision = precision_score(y_test, y_pred, average='macro', zero_division=0)
                recall = recall_score(y_test, y_pred, average='macro', zero_division=0)
                f1 = f1_score(y_test, y_pred, average='macro', zero_division=0)

                # 混淆矩陣圖
                disp = ConfusionMatrixDisplay(confusion_matrix(y_test, y_pred), display_labels=np.unique(y))
                disp.plot(cmap='Blues', values_format='d')
                plt.title(f"Confusion Matrix\nKernel={kernel} C={C_val} SEQ={seq_len}")
                plt.savefig(os.path.join(RESULT_DIR, "confusion_matrices", f"k{kernel}_c{C_val}_seq{seq_len}.png"), bbox_inches='tight')
                plt.close()

                results.append({
                    'kernel': kernel, 'C': C_val, 'seq_len': seq_len,
                    'final_acc': acc, 'precision': precision, 'recall': recall,
                    'f1_score': f1, 'training_time_s': round(t1 - t0, 2)
                })

    df = pd.DataFrame(results)
    csv_path = os.path.join(RESULT_DIR, "svm_experiment_results.csv")
    df.to_csv(csv_path, index=False)
    print(f"\n📄 Results saved to {csv_path}")

    if not df.empty:
        best = df.loc[df['final_acc'].idxmax()]
        print(f"\n🏆 Best: Kernel={best['kernel']} | C={best['C']} | SEQ={best['seq_len']} | ACC={best['final_acc']:.4f}")
        return results, best
    else:
        print("\n❗ No valid results")
        return [], None

In [None]:
seq_lens = [4, 8, 10]
C_list = [1.0, 10.0]
kernel_list = ["rbf", "linear"]

results, best = train_svm_once(kernel_list, C_list, seq_lens)

print("\n🎯 最佳參數組合：")
print(f"Sequence Length = {best['seq_len']}")
print(f"C = {best['C']}")
print(f"Kernel = {best['kernel']}")
print(f"Accuracy = {best['final_acc']:.4f}")
print(f"Precision = {best['precision']:.4f}")
print(f"Recall = {best['recall']:.4f}")
print(f"F1-score = {best['f1_score']:.4f}")
print(f"Training Time = {best['training_time_s']} 秒")


🔎 Kernel=rbf | C=1.0 | SEQ=4
📊 Total sequences loaded: 10023

🔎 Kernel=rbf | C=1.0 | SEQ=8
📊 Total sequences loaded: 5001

🔎 Kernel=rbf | C=1.0 | SEQ=10
📊 Total sequences loaded: 4010

🔎 Kernel=rbf | C=10.0 | SEQ=4
📊 Total sequences loaded: 10023

🔎 Kernel=rbf | C=10.0 | SEQ=8
📊 Total sequences loaded: 5001

🔎 Kernel=rbf | C=10.0 | SEQ=10
📊 Total sequences loaded: 4010

🔎 Kernel=linear | C=1.0 | SEQ=4
📊 Total sequences loaded: 10023

🔎 Kernel=linear | C=1.0 | SEQ=8
📊 Total sequences loaded: 5001

🔎 Kernel=linear | C=1.0 | SEQ=10
📊 Total sequences loaded: 4010

🔎 Kernel=linear | C=10.0 | SEQ=4
📊 Total sequences loaded: 10023

🔎 Kernel=linear | C=10.0 | SEQ=8
📊 Total sequences loaded: 5001

🔎 Kernel=linear | C=10.0 | SEQ=10
📊 Total sequences loaded: 4010

📄 Results saved to ./models/SVM/result/svm_experiment_results.csv

🏆 Best: Kernel=rbf | C=10.0 | SEQ=8 | ACC=0.9950

🎯 最佳參數組合：
Sequence Length = 8
C = 10.0
Kernel = rbf
Accuracy = 0.9950
Precision = 0.9936
Recall = 0.9927
F1-score = 0.

GRU

In [None]:
import os
import numpy as np
import pandas as pd
import time
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# === 資料夾設定 ===
BASE_PATH = "/content/drive/MyDrive/Colab Notebooks/dataset"
LABEL_DIRS = {
    0: os.path.join(BASE_PATH, "normal"),
    1: os.path.join(BASE_PATH, "abnormal/transformer_rust"),
    2: os.path.join(BASE_PATH, "abnormal/wire_rust"),
    3: os.path.join(BASE_PATH, "abnormal/wire_peeling"),
}
RESULT_DIR = "./models/GRU/result"
for sub in ["accuracy_curves", "loss_curves", "confusion_matrices",
            "precision_curves", "recall_curves", "f1_score_curves"]:
    os.makedirs(os.path.join(RESULT_DIR, sub), exist_ok=True)

# === 自訂 Dataset ===
class ChargeSequenceDataset(Dataset):
    def __init__(self, sequences, labels):
        self.X = torch.tensor(sequences, dtype=torch.float32)
        self.y = torch.tensor(labels, dtype=torch.long)
    def __len__(self): return len(self.X)
    def __getitem__(self, idx): return self.X[idx], self.y[idx]

# === GRU 模型 ===
class GRUClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_classes):
        super().__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)
    def forward(self, x):
        out, _ = self.gru(x)
        out = out[:, -1, :]
        return self.fc(out)

# === 資料處理 ===
def process_file(file_path, label, max_seq_len):
    df = pd.read_csv(file_path)

    # 只保留 voltage, current, power 三欄
    data = df[["voltage", "current", "power"]].values.astype(np.float32)

    # 用 max_seq_len 分段切割序列
    num_chunks = len(data) // max_seq_len
    chunks = [data[i * max_seq_len : (i + 1) * max_seq_len] for i in range(num_chunks)]

    return chunks, [label] * len(chunks)

def load_all_sequences(max_seq_len):
    all_seq, all_labels = [], []
    total_sequences = 0
    for label, folder in LABEL_DIRS.items():
        for fname in os.listdir(folder):
            if fname.endswith(".csv"):
                path = os.path.join(folder, fname)
                seqs, labels = process_file(path, label, max_seq_len)
                all_seq.extend(seqs)
                all_labels.extend(labels)
                total_sequences += len(seqs)

    seq_arr = np.array(all_seq, dtype=np.float32)
    labels_arr = np.array(all_labels, dtype=np.int64)
    B, T, F = seq_arr.shape
    reshaped = seq_arr.reshape(-1, F)
    scaled = StandardScaler().fit_transform(reshaped).reshape(B, T, F)

    print(f"📊 Total sequences loaded: {total_sequences}")

    return scaled, labels_arr

# === 訓練與儲存 ===
def train_and_search_gru(batch_sizes, learning_rates, seq_lens, num_epochs=100, hidden_dim=64, num_layers=1, num_classes=4):
    results = []
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    for bs in batch_sizes:
        for lr in learning_rates:
            for seq_len in seq_lens:
                print(f"\n🧪 BS={bs} | LR={lr} | SEQ={seq_len}")
                X, y = load_all_sequences(seq_len)
                X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
                train_loader = DataLoader(ChargeSequenceDataset(X_train, y_train), batch_size=bs, shuffle=True)
                test_loader = DataLoader(ChargeSequenceDataset(X_test, y_test), batch_size=bs)
                model = GRUClassifier(input_dim=3, hidden_dim=hidden_dim, num_layers=num_layers, num_classes=num_classes).to(device)
                optimizer = optim.Adam(model.parameters(), lr=lr)
                criterion = nn.CrossEntropyLoss()

                acc_list, loss_list = [], []
                precision_list, recall_list, f1_list = [], [], []
                t0 = time.time()

                for epoch in range(num_epochs):
                    model.train()
                    for xb, yb in train_loader:
                        xb, yb = xb.to(device), yb.to(device)
                        optimizer.zero_grad()
                        out = model(xb)
                        loss = criterion(out, yb)
                        loss.backward()
                        optimizer.step()

                    # Evaluate
                    model.eval()
                    correct, total, total_loss = 0, 0, 0
                    y_pred, y_true = [], []
                    with torch.no_grad():
                        for xb, yb in test_loader:
                            xb, yb = xb.to(device), yb.to(device)
                            out = model(xb)
                            loss = criterion(out, yb)
                            _, pred = torch.max(out, 1)
                            correct += (pred == yb).sum().item()
                            total += yb.size(0)
                            total_loss += loss.item() * xb.size(0)
                            y_pred.extend(pred.cpu().numpy())
                            y_true.extend(yb.cpu().numpy())

                    acc = correct / total
                    avg_loss = total_loss / total
                    precision = precision_score(y_true, y_pred, average='macro', zero_division=0)
                    recall = recall_score(y_true, y_pred, average='macro', zero_division=0)
                    f1 = f1_score(y_true, y_pred, average='macro', zero_division=0)

                    acc_list.append(acc)
                    loss_list.append(avg_loss)
                    precision_list.append(precision)
                    recall_list.append(recall)
                    f1_list.append(f1)

                    print(f"Epoch {epoch+1}/{num_epochs} | Acc: {acc:.4f} | Loss: {avg_loss:.4f}")

                t1 = time.time()

                if acc_list[-1] >= 1.0:
                    print(f"⚠️ Skipped BS={bs} LR={lr} SEQ={seq_len} due to acc=1.0")
                    continue

                # === 儲存折線圖 ===
                def save_curve(data, name, ylabel):
                    plt.plot(range(1, num_epochs+1), data, marker='o')
                    plt.title(f"{ylabel} (BS={bs}, LR={lr}, SEQ={seq_len})")
                    plt.xlabel("Epoch"); plt.ylabel(ylabel); plt.ylim(0, 1.0); plt.grid(True)
                    path = os.path.join(RESULT_DIR, f"{name}_curves", f"bs{bs}_lr{lr}_seq{seq_len}.png")
                    plt.savefig(path, bbox_inches='tight')
                    plt.close()

                save_curve(acc_list, "accuracy", "Accuracy")
                save_curve(loss_list, "loss", "Loss")
                save_curve(precision_list, "precision", "Precision")
                save_curve(recall_list, "recall", "Recall")
                save_curve(f1_list, "f1_score", "F1-score")

                # 混淆矩陣
                disp = ConfusionMatrixDisplay(confusion_matrix=confusion_matrix(y_true, y_pred), display_labels=np.unique(y))
                disp.plot(cmap='Blues', values_format='d')
                plt.title(f"Confusion Matrix\nBS={bs} LR={lr} SEQ={seq_len}")
                plt.savefig(os.path.join(RESULT_DIR, "confusion_matrices", f"bs{bs}_lr{lr}_seq{seq_len}.png"), bbox_inches='tight')
                plt.close()

                # 儲存結果
                results.append({
                    'batch_size': bs, 'learning_rate': lr, 'seq_len': seq_len,
                    'final_acc': acc_list[-1], 'final_loss': loss_list[-1],
                    'precision': precision_list[-1], 'recall': recall_list[-1], 'f1_score': f1_list[-1],
                    'training_time_s': round(t1 - t0, 2)
                })

    df = pd.DataFrame(results)
    csv_path = os.path.join(RESULT_DIR, "gru_experiment_results.csv")
    df.to_csv(csv_path, index=False)
    print(f"\n📄 All experiment results saved to {csv_path}")

    if not df.empty:
        best = df.loc[df['final_acc'].idxmax()]
        print(f"\n🏆 Best: BS={best['batch_size']} | LR={best['learning_rate']} | SEQ={best['seq_len']} | ACC={best['final_acc']:.4f}")
        return results, best
    else:
        print("\n❗ No valid results (all accuracy = 1.0 were skipped)")
        return [], None

In [None]:
# 設定訓練參數
batch_sizes = [4, 8, 16]
learning_rates = [1e-3, 1e-4]
seq_lens = [4, 8, 10]
num_epochs = 100

# 顯示總組數
total_combinations = len(batch_sizes) * len(learning_rates) * len(seq_lens)
print(f"🔍 總共訓練組數：{total_combinations}，每組訓練 {num_epochs} epochs")

# 執行網格搜尋訓練
results, best = train_and_search_gru(batch_sizes, learning_rates, seq_lens, num_epochs=num_epochs)

# 輸出最佳參數與指標
if best is not None:
    print("\n🎯 最佳參數組合：")
    print(f"Batch Size     = {best['batch_size']}")
    print(f"Learning Rate  = {best['learning_rate']}")
    print(f"Sequence Length= {best['seq_len']}")
    print(f"Final Accuracy = {best['final_acc']:.4f}")
    print(f"Final Loss     = {best['final_loss']:.4f}")
    print(f"Precision      = {best['precision']:.4f}")
    print(f"Recall         = {best['recall']:.4f}")
    print(f"F1-score       = {best['f1_score']:.4f}")
    print(f"Training Time  = {best['training_time_s']} 秒")
else:
    print("❗ 沒有有效結果（準確率全部為 1.0）")

🔍 總共訓練組數：18，每組訓練 100 epochs

🧪 BS=4 | LR=0.001 | SEQ=4
📊 Total sequences loaded: 10023
Epoch 1/100 | Acc: 0.9835 | Loss: 0.0811
Epoch 2/100 | Acc: 0.9880 | Loss: 0.0585
Epoch 3/100 | Acc: 0.9830 | Loss: 0.0630
Epoch 4/100 | Acc: 0.9890 | Loss: 0.0434
Epoch 5/100 | Acc: 0.9885 | Loss: 0.0410
Epoch 6/100 | Acc: 0.9910 | Loss: 0.0371
Epoch 7/100 | Acc: 0.9900 | Loss: 0.0342
Epoch 8/100 | Acc: 0.9850 | Loss: 0.0491
Epoch 9/100 | Acc: 0.9915 | Loss: 0.0302
Epoch 10/100 | Acc: 0.9930 | Loss: 0.0294
Epoch 11/100 | Acc: 0.9900 | Loss: 0.0291
Epoch 12/100 | Acc: 0.9925 | Loss: 0.0262
Epoch 13/100 | Acc: 0.9915 | Loss: 0.0341
Epoch 14/100 | Acc: 0.9940 | Loss: 0.0320
Epoch 15/100 | Acc: 0.9915 | Loss: 0.0291
Epoch 16/100 | Acc: 0.9935 | Loss: 0.0255
Epoch 17/100 | Acc: 0.9935 | Loss: 0.0263
Epoch 18/100 | Acc: 0.9920 | Loss: 0.0289
Epoch 19/100 | Acc: 0.9875 | Loss: 0.0429
Epoch 20/100 | Acc: 0.9945 | Loss: 0.0253
Epoch 21/100 | Acc: 0.9945 | Loss: 0.0260
Epoch 22/100 | Acc: 0.9920 | Loss: 0.024

1D CNN

In [None]:
# === CNN 模型完整訓練程式 ===
import os
import numpy as np
import pandas as pd
import time
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# === 資料夾設定 ===
BASE_PATH = "/content/drive/MyDrive/Colab Notebooks/dataset"
LABEL_DIRS = {
    0: os.path.join(BASE_PATH, "normal"),
    1: os.path.join(BASE_PATH, "abnormal/transformer_rust"),
    2: os.path.join(BASE_PATH, "abnormal/wire_rust"),
    3: os.path.join(BASE_PATH, "abnormal/wire_peeling"),
}
RESULT_DIR = "./models/CNN/result"
for sub in ["accuracy_curves", "loss_curves", "confusion_matrices",
            "precision_curves", "recall_curves", "f1_score_curves"]:
    os.makedirs(os.path.join(RESULT_DIR, sub), exist_ok=True)

# === Dataset ===
class ChargeSequenceDataset(Dataset):
    def __init__(self, sequences, labels):
        self.X = torch.tensor(sequences, dtype=torch.float32).unsqueeze(1)  # [B, 1, T, F]
        self.y = torch.tensor(labels, dtype=torch.long)
    def __len__(self): return len(self.X)
    def __getitem__(self, idx): return self.X[idx], self.y[idx]

# === CNN 模型 ===
class CNNClassifier(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(32, num_classes)
        )
    def forward(self, x):
        return self.model(x)

# === 資料處理 ===
def process_file(file_path, label, max_seq_len):
    df = pd.read_csv(file_path)

    # 只保留 voltage, current, power 三欄
    data = df[["voltage", "current", "power"]].values.astype(np.float32)

    # 用 max_seq_len 分段切割序列
    num_chunks = len(data) // max_seq_len
    chunks = [data[i * max_seq_len : (i + 1) * max_seq_len] for i in range(num_chunks)]

    return chunks, [label] * len(chunks)

def load_all_sequences(max_seq_len):
    all_seq, all_labels = [], []
    total_sequences = 0
    for label, folder in LABEL_DIRS.items():
        for fname in os.listdir(folder):
            if fname.endswith(".csv"):
                path = os.path.join(folder, fname)
                seqs, labels = process_file(path, label, max_seq_len)
                all_seq.extend(seqs)
                all_labels.extend(labels)
                total_sequences += len(seqs)

    seq_arr = np.array(all_seq, dtype=np.float32)
    labels_arr = np.array(all_labels, dtype=np.int64)
    B, T, F = seq_arr.shape
    reshaped = seq_arr.reshape(-1, F)
    scaled = StandardScaler().fit_transform(reshaped).reshape(B, T, F)

    print(f"📊 Total sequences loaded: {total_sequences}")

    return scaled, labels_arr

# === 訓練主函式 ===
def train_and_search_cnn(batch_sizes, learning_rates, seq_lens, num_epochs=100, num_classes=4):
    results = []
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    for bs in batch_sizes:
        for lr in learning_rates:
            for seq_len in seq_lens:
                print(f"\n🧪 BS={bs} | LR={lr} | SEQ={seq_len}")
                X, y = load_all_sequences(seq_len)
                X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
                train_loader = DataLoader(ChargeSequenceDataset(X_train, y_train), batch_size=bs, shuffle=True)
                test_loader = DataLoader(ChargeSequenceDataset(X_test, y_test), batch_size=bs)
                model = CNNClassifier(num_classes=num_classes).to(device)
                optimizer = optim.Adam(model.parameters(), lr=lr)
                criterion = nn.CrossEntropyLoss()

                acc_list, loss_list, prec_list, rec_list, f1_list = [], [], [], [], []
                t0 = time.time()
                for epoch in range(num_epochs):
                    model.train()
                    for xb, yb in train_loader:
                        xb, yb = xb.to(device), yb.to(device)
                        optimizer.zero_grad()
                        out = model(xb)
                        loss = criterion(out, yb)
                        loss.backward()
                        optimizer.step()
                    # Evaluate
                    model.eval()
                    correct, total, total_loss = 0, 0, 0
                    y_true, y_pred = [], []
                    with torch.no_grad():
                        for xb, yb in test_loader:
                            xb, yb = xb.to(device), yb.to(device)
                            out = model(xb)
                            loss = criterion(out, yb)
                            _, pred = torch.max(out, 1)
                            correct += (pred == yb).sum().item()
                            total += yb.size(0)
                            total_loss += loss.item() * xb.size(0)
                            y_pred.extend(pred.cpu().numpy())
                            y_true.extend(yb.cpu().numpy())
                    acc = correct / total
                    avg_loss = total_loss / total
                    acc_list.append(acc)
                    loss_list.append(avg_loss)
                    prec_list.append(precision_score(y_true, y_pred, average='macro', zero_division=0))
                    rec_list.append(recall_score(y_true, y_pred, average='macro', zero_division=0))
                    f1_list.append(f1_score(y_true, y_pred, average='macro', zero_division=0))
                    print(f"Epoch {epoch+1}/{num_epochs} | Acc: {acc:.4f} | Loss: {avg_loss:.4f}")
                t1 = time.time()

                if acc_list[-1] >= 1.0:
                    print(f"⚠️ Skipped BS={bs} LR={lr} SEQ={seq_len} due to acc=1.0")
                    continue

                # 儲存所有曲線圖
                metrics = {
                    "accuracy": acc_list,
                    "loss": loss_list,
                    "precision": prec_list,
                    "recall": rec_list,
                    "f1_score": f1_list
                }
                for name, values in metrics.items():
                    plt.plot(range(1, num_epochs+1), values, marker='o')
                    plt.title(f"{name.capitalize()} (BS={bs}, LR={lr}, SEQ={seq_len})")
                    plt.xlabel("Epoch"); plt.ylabel(name.capitalize())
                    if name != "loss": plt.ylim(0, 1.0)
                    plt.grid(True)
                    folder = f"{name}_curves"
                    fname = f"bs{bs}_lr{lr}_seq{seq_len}.png"
                    plt.savefig(os.path.join(RESULT_DIR, folder, fname), bbox_inches='tight')
                    plt.close()

                # 混淆矩陣
                cm = confusion_matrix(y_true, y_pred)
                disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=np.unique(y))
                disp.plot(cmap='Blues', values_format='d')
                plt.title(f"Confusion Matrix\nBS={bs} LR={lr} SEQ={seq_len}")
                plt.savefig(os.path.join(RESULT_DIR, "confusion_matrices", f"bs{bs}_lr{lr}_seq{seq_len}.png"), bbox_inches='tight')
                plt.close()

                # 儲存結果
                results.append({
                    'batch_size': bs, 'learning_rate': lr, 'seq_len': seq_len,
                    'final_acc': acc_list[-1], 'final_loss': loss_list[-1],
                    'precision': prec_list[-1], 'recall': rec_list[-1], 'f1_score': f1_list[-1],
                    'training_time_s': round(t1 - t0, 2)
                })

    df = pd.DataFrame(results)
    csv_path = os.path.join(RESULT_DIR, "cnn_experiment_results.csv")
    df.to_csv(csv_path, index=False)
    print(f"\n📄 All experiment results saved to {csv_path}")

    if not df.empty:
        best = df.loc[df['final_acc'].idxmax()]
        print(f"\n🏆 Best: BS={best['batch_size']} | LR={best['learning_rate']} | SEQ={best['seq_len']} | ACC={best['final_acc']:.4f}")
        return results, best
    else:
        print("\n❗ No valid results (all accuracy = 1.0 were skipped)")
        return [], None

In [None]:
# 設定訓練參數
batch_sizes = [4, 8, 16]
learning_rates = [1e-3, 1e-4]
seq_lens = [4, 8, 10]
num_epochs = 100

# 顯示總組數
total_combinations = len(batch_sizes) * len(learning_rates) * len(seq_lens)
print(f"🔍 總共訓練組數：{total_combinations}，每組訓練 {num_epochs} epochs")

# 執行網格搜尋訓練
results, best = train_and_search_cnn(batch_sizes, learning_rates, seq_lens, num_epochs=num_epochs)

# 輸出最佳參數與指標
if best is not None:
    print("\n🎯 最佳參數組合：")
    print(f"Batch Size     = {best['batch_size']}")
    print(f"Learning Rate  = {best['learning_rate']}")
    print(f"Sequence Length= {best['seq_len']}")
    print(f"Final Accuracy = {best['final_acc']:.4f}")
    print(f"Final Loss     = {best['final_loss']:.4f}")
    print(f"Precision      = {best['precision']:.4f}")
    print(f"Recall         = {best['recall']:.4f}")
    print(f"F1-score       = {best['f1_score']:.4f}")
    print(f"Training Time  = {best['training_time_s']} 秒")
else:
    print("❗ 沒有有效結果（準確率全部為 1.0）")

🔍 總共訓練組數：18，每組訓練 100 epochs

🧪 BS=4 | LR=0.001 | SEQ=4
📊 Total sequences loaded: 10116
Epoch 1/100 | Acc: 0.9733 | Loss: 0.1370
Epoch 2/100 | Acc: 0.9758 | Loss: 0.0961
Epoch 3/100 | Acc: 0.9768 | Loss: 0.0810
Epoch 4/100 | Acc: 0.9788 | Loss: 0.0767
Epoch 5/100 | Acc: 0.9802 | Loss: 0.0721
Epoch 6/100 | Acc: 0.9847 | Loss: 0.0650
Epoch 7/100 | Acc: 0.9837 | Loss: 0.0623
Epoch 8/100 | Acc: 0.9832 | Loss: 0.0662
Epoch 9/100 | Acc: 0.9857 | Loss: 0.0641
Epoch 10/100 | Acc: 0.9872 | Loss: 0.0563
Epoch 11/100 | Acc: 0.9862 | Loss: 0.0579
Epoch 12/100 | Acc: 0.9847 | Loss: 0.0578
Epoch 13/100 | Acc: 0.9857 | Loss: 0.0530
Epoch 14/100 | Acc: 0.9827 | Loss: 0.0655
Epoch 15/100 | Acc: 0.9867 | Loss: 0.0484
Epoch 16/100 | Acc: 0.9857 | Loss: 0.0488
Epoch 17/100 | Acc: 0.9872 | Loss: 0.0530
Epoch 18/100 | Acc: 0.9842 | Loss: 0.0625
Epoch 19/100 | Acc: 0.9862 | Loss: 0.0515
Epoch 20/100 | Acc: 0.9852 | Loss: 0.0592
Epoch 21/100 | Acc: 0.9852 | Loss: 0.0507
Epoch 22/100 | Acc: 0.9857 | Loss: 0.057

存結果

In [None]:
EXPORT_DIR = "/content/drive/MyDrive/Colab Notebooks/test/new"
os.makedirs(EXPORT_DIR, exist_ok=True)

In [None]:
import shutil

# 模型清單（依照你訓練過的模型命名）
model_names = ["LSTM", "GRU", "MLP", "SVM", "CNN"]

# 每種圖表類型（包含新增的 precision 與 recall）
curve_folders = [
    "accuracy_curves",
    "loss_curves",
    "f1_score_curves",
    "confusion_matrices",
    "accuracy_train_vs_val",
    "loss_train_vs_val",
]

# 匯出路徑
EXPORT_DIR = "/content/drive/MyDrive/Colab Notebooks/test/new"
os.makedirs(EXPORT_DIR, exist_ok=True)

for model in model_names:
    model_result_path = f"./models/{model}/result"
    export_model_path = os.path.join(EXPORT_DIR, model)
    os.makedirs(export_model_path, exist_ok=True)

    # 匯出實驗 CSV 檔
    csv_name = f"{model.lower()}_experiment_results.csv"
    csv_path = os.path.join(model_result_path, csv_name)
    if os.path.exists(csv_path):
        shutil.copy(csv_path, os.path.join(export_model_path, "experiment_results.csv"))

    # 匯出圖表資料夾
    for folder in curve_folders:
        src_folder = os.path.join(model_result_path, folder)
        dst_folder = os.path.join(export_model_path, folder)
        if os.path.exists(src_folder):
            shutil.copytree(src_folder, dst_folder, dirs_exist_ok=True)

print("✅ 所有模型的結果已儲存至：", EXPORT_DIR)