In [2]:
import os
import glob
import random
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split, KFold
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
import numpy as np


In [3]:

# 사용자 정의 Dataset 클래스
class BatteryDataset(Dataset):
    def __init__(self, file_paths):
        self.files = file_paths

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        data, meta = torch.load(self.files[idx])
        x = data  # feature tensor
        lengths = meta['lengths']
        label = int(meta['label'])  # 브랜드 레이블
        return x, lengths, label

# DataLoader 생성 헬퍼
def get_dataloaders(train_files, val_files, batch_size=32, num_workers=4):
    train_ds = BatteryDataset(train_files)
    val_ds = BatteryDataset(val_files)
    tr_loader = DataLoader(
        train_ds, batch_size=batch_size, shuffle=True,
        num_workers=num_workers, collate_fn=None
    )
    val_loader = DataLoader(
        val_ds, batch_size=batch_size, shuffle=False,
        num_workers=num_workers, collate_fn=None
    )
    return tr_loader, val_loader

# LSTM 기반 분류 모델
class LSTMCls(nn.Module):
    def __init__(self, in_feat=8, hidden_size=256, num_layers=3,
                 dropout=0.3, num_classes=2):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=in_feat,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout
        )
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x, lengths):
        # 필요 시 pack_padded_sequence 활용하여도 됩니다
        out, _ = self.lstm(x)
        # 각 시퀀스의 마지막 타임스텝 출력 사용
        idx = lengths - 1
        out = out[torch.arange(out.size(0)), idx]
        return self.fc(out)


In [4]:

# 전체 브랜드 대상  학습 함수
def train_all_brands(
    device,
    dataset_root='./dataset/battery',
    model_root='./models',
    utils_root='./five_fold_utils',
    holdout_ratio=0.2,
    k_folds=5,
    fold_epochs=5,
    final_epochs=10,
    batch_size=32,
    lr=1e-3
):
    os.makedirs(model_root, exist_ok=True)

    # 🔹 인/아웃 분포 차량 정보 로드
    all_car_dict = np.load(os.path.join(utils_root, 'all_car_dict.npz.npy'), allow_pickle=True).item()
    ind_odd_dicts = {
        i: np.load(os.path.join(utils_root, f'ind_odd_dict{i}.npz.npy'), allow_pickle=True).item()
        for i in (1, 2, 3)
    }

    brand_dirs = glob.glob(os.path.join(dataset_root, 'battery_brand*'))

    for brand_path in brand_dirs:
        brand_name = os.path.basename(brand_path)
        brand_num = int(brand_name.replace('battery_brand', ''))
        print(f"\n=== {brand_name} 학습 시작 ===")

        # 🔹 .pkl 파일 → 차량 ID 매핑
        all_files = glob.glob(os.path.join(brand_path, '*.pkl'))
        def get_car_id_from_path(p):
            return int(os.path.splitext(os.path.basename(p))[0].split('_')[-1])
        car_id_to_path = {get_car_id_from_path(p): p for p in all_files}

        # 🔹 인/아웃 차량 ID 기반으로 대상 차량만 필터링
        ind_ids = ind_odd_dicts[brand_num]['ind_sorted']
        ood_ids = ind_odd_dicts[brand_num]['ood_sorted']
        all_ids = ind_ids + ood_ids
        target_files = [car_id_to_path[i] for i in all_ids if i in car_id_to_path]

        # 🔹 셔플 및 홀드아웃 분할
        random.seed(42)
        random.shuffle(target_files)
        split_idx = int(len(target_files) * (1 - holdout_ratio))
        train_files = target_files[:split_idx]
        test_files = target_files[split_idx:]

        # 🔹 Train → KFold Cross Validation
        kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
        best_acc = 0.0
        best_state = None

        for fold, (tr_idx, val_idx) in enumerate(kf.split(train_files)):
            tr_files = [train_files[i] for i in tr_idx]
            val_files = [train_files[i] for i in val_idx]
            tr_loader, val_loader = get_dataloaders(tr_files, val_files, batch_size=batch_size)

            model = LSTMCls(num_classes=2).to(device)
            optimizer = torch.optim.Adam(model.parameters(), lr=lr)
            criterion = nn.CrossEntropyLoss()

            for epoch in range(1, fold_epochs + 1):
                model.train()
                running_loss = 0.0
                for x, lengths, y in tr_loader:
                    x, lengths, y = x.to(device), lengths.to(device), y.to(device)
                    optimizer.zero_grad()
                    loss = criterion(model(x, lengths), y)
                    loss.backward()
                    optimizer.step()
                    running_loss += loss.item()

                # Validation
                model.eval()
                correct = total = 0
                with torch.no_grad():
                    for x, lengths, y in val_loader:
                        x, lengths, y = x.to(device), lengths.to(device), y.to(device)
                        pred = model(x, lengths).argmax(dim=1)
                        correct += (pred == y).sum().item()
                        total += y.size(0)
                val_acc = correct / total
                print(f"[{brand_name} Fold{fold} Ep{epoch}] Val Acc={val_acc:.4f}")

                if val_acc > best_acc:
                    best_acc = val_acc
                    best_state = model.state_dict()

        print(f"최고 검증 정확도({brand_name}): {best_acc:.4f}")

        # 🔹 최종 학습 (Train+Val 전체, Best Weight Init)
        train_loader_full, _ = get_dataloaders(train_files, train_files, batch_size=batch_size)
        final_model = LSTMCls(num_classes=2).to(device)
        final_model.load_state_dict(best_state)
        optimizer = torch.optim.Adam(final_model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()

        for epoch in range(1, final_epochs + 1):
            final_model.train()
            running_loss = 0.0
            for x, lengths, y in train_loader_full:
                x, lengths, y = x.to(device), lengths.to(device), y.to(device)
                optimizer.zero_grad()
                loss = criterion(final_model(x, lengths), y)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
            print(f"[{brand_name} Final Ep{epoch}] Loss={(running_loss/len(train_loader_full)):.4f}")

        # 🔹 테스트 평가
        test_loader, _ = get_dataloaders(test_files, test_files, batch_size=batch_size)
        final_model.eval()
        correct = total = 0
        with torch.no_grad():
            for x, lengths, y in test_loader:
                x, lengths, y = x.to(device), lengths.to(device), y.to(device)
                pred = final_model(x, lengths).argmax(dim=1)
                correct += (pred == y).sum().item()
                total += y.size(0)
        test_acc = correct / total
        print(f"테스트 정확도({brand_name}): {test_acc:.4f}\n")

        # 🔹 모델 저장
        save_path = os.path.join(model_root, f"{brand_name}_final.pth")
        torch.save(final_model.state_dict(), save_path)
        print(f"모델 저장: {save_path}\n")
if __name__ == '__main__':
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    train_all_brands(device)

In [None]:
train_all_brands(
    device,
    dataset_root='./dataset/battery',
    model_root='./models',
    utils_root='./five_fold_utils',
    holdout_ratio=0.2,
    k_folds=5,
    fold_epochs=5,
    final_epochs=10,
    batch_size=32,
    lr=1e-3
):