In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from datetime import datetime
import glob
import torch_directml
import random
import torch.nn.functional as F
import torchaudio.transforms as T

In [36]:
if torch_directml.is_available():
    device = torch_directml.device()
    print(f"DirectML device: {device}")
else:
    device = torch.device("cpu")
    print("DirectML not available, using CPU.")

DirectML device: privateuseone:0


In [None]:
class LogMelAudioDataset(Dataset):
    def __init__(self, npy_log_mel_base_dir, file_names_list, labels_list, mean=None, std=None):
        self.npy_log_mel_base_dir = npy_log_mel_base_dir
        self.file_names = file_names_list
        self.labels = labels_list
        self.mean = mean
        self.std = std

    def __len__(self):
        return len(self.file_names)

    def __getitem__(self, idx):
        file_name_id = self.file_names[idx]
        log_mel_path = os.path.join(self.npy_log_mel_base_dir, file_name_id + '.npy')
        try:
            log_mel_map = np.load(log_mel_path).astype(np.float32)
        except FileNotFoundError as e:
            print(f"오류: 원본 데이터 파일 로드 실패 (ID: {file_name_id}) - {e}")
            raise e

        if self.mean is not None and self.std is not None:
            log_mel_map = (log_mel_map - self.mean) / self.std

        log_mel_tensor = torch.from_numpy(log_mel_map).float().unsqueeze(0)

        if self.is_train and self.spec_augment is not None:
            log_mel_tensor = self.spec_augment(log_mel_tensor)

        if not self.is_test:
            label = self.labels[idx]
            label_tensor = torch.tensor(label, dtype=torch.long)
            return log_mel_tensor, label_tensor
        else:
            return log_mel_tensor, file_name_id

In [38]:
class SingleBranchLogMelCNN(nn.Module):
    def __init__(self, num_classes,
                 input_channels, input_height, input_width,
                 fc_hidden_dim=256,
                 dropout_rate=0.5):
        super(SingleBranchLogMelCNN, self).__init__()
        self.target_height = 16
        self.target_width = 6
        self.conv1 = nn.Conv2d(input_channels, 16, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu3 = nn.ReLU()
        self.adaptive_pool = nn.AdaptiveMaxPool2d((self.target_height, self.target_width))
        with torch.no_grad():
            dummy_input = torch.zeros(1, input_channels, input_height, input_width)
            x = self.pool1(self.relu1(self.conv1(dummy_input)))
            x = self.pool2(self.relu2(self.conv2(x)))
            x = self.relu3(self.conv3(x))
            out = self.adaptive_pool(x)
            self.flattened_size = out.numel()
        self.classifier_fc = nn.Sequential(
            nn.Linear(self.flattened_size, fc_hidden_dim),
            nn.BatchNorm1d(fc_hidden_dim),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(fc_hidden_dim, num_classes)
        )
    def forward(self, x):
        out = self.pool1(self.relu1(self.conv1(x)))
        out = self.pool2(self.relu2(self.conv2(out)))
        out = self.relu3(self.conv3(out))
        out = self.adaptive_pool(out)
        out = out.view(-1, self.flattened_size)
        output = self.classifier_fc(out)
        return output

In [None]:
def calculate_feature_stats(file_names_list, base_dir):
    all_values_sum = 0.0
    all_values_sum_sq = 0.0
    total_elements_count = 0
    print(f"'{base_dir}'에서 통계치 계산 중 ({len(file_names_list)}개 파일)...")
    for fname_id in file_names_list:
        path = os.path.join(base_dir, fname_id + '.npy')
        try:
            data = np.load(path).astype(np.float64)
            all_values_sum += np.sum(data)
            all_values_sum_sq += np.sum(data**2)
            total_elements_count += data.size
        except FileNotFoundError: continue
        except Exception as e: continue

    if total_elements_count == 0:
        return 0.0, 1.0
        
    mean = all_values_sum / total_elements_count
    variance = (all_values_sum_sq / total_elements_count) - (mean**2)
    std = np.sqrt(max(variance, 1e-12))
    std = max(std, 1e-8)
    return mean, std

In [40]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0.0, path='best_model_checkpoint.pt'):
        self.patience = patience
        self.min_delta = min_delta
        self.path = path
        self.counter = 0
        self.best_loss = np.inf
        self.early_stop = False
        self.best_model_saved = False

    def __call__(self, val_loss, model):
        if self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            # print(f"Validation loss decreased ({self.best_loss:.6f}). Saving model to {self.path}")
            torch.save(model.state_dict(), self.path)
            self.best_model_saved = True
        else:
            self.counter += 1
            # print(f"EarlyStopping counter: {self.counter} out of {self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True

In [None]:
PERSISTENT_STORAGE_PATH = "C:\\\\머신러닝\\\\kaggle2\\\\kaggle"
os.makedirs(PERSISTENT_STORAGE_PATH, exist_ok=True)
MODEL_SAVE_DIR = os.path.join(PERSISTENT_STORAGE_PATH, 'model')
SUBMISSION_SAVE_DIR = os.path.join(PERSISTENT_STORAGE_PATH, 'submission')
os.makedirs(MODEL_SAVE_DIR, exist_ok=True)
os.makedirs(SUBMISSION_SAVE_DIR, exist_ok=True)
CSV_FILE_PATH = os.path.join(PERSISTENT_STORAGE_PATH, 'train.csv')
TEST_CSV_FILE_PATH = os.path.join(PERSISTENT_STORAGE_PATH, 'test.csv')
NPY_LOG_MEL_TRAIN_DIR_FOR_DATASET = os.path.join(PERSISTENT_STORAGE_PATH, 'train_log_mel')
NPY_LOG_MEL_TEST_DIR_FOR_DATASET= os.path.join(PERSISTENT_STORAGE_PATH, 'test_log_mel')
N_SPLITS = 10
FILENAME_COLUMN_IN_CSV = "ID"
TARGET_COLUMN_IN_CSV = "Target"
CLASSES = ["I", "E"]
NUM_CLASSES = len(CLASSES)
BATCH_SIZE = 32
NUM_EPOCHS = 50
LEARNING_RATE = 0.0001
RANDOM_SEED = 42
FC_HIDDEN_DIM_MODEL = 128
DROPOUT_RATE_MODEL = 0.5

# input channels 1개!!!, height, width
INPUT_CHANNELS = 1    
LOG_MEL_HEIGHT, LOG_MEL_WIDTH = 128, 44

EARLY_STOPPING_PATIENCE = 10
EARLY_STOPPING_MIN_DELTA = 0.001
NUM_WORKERS = 0

# --- 데이터 준비 (CSV 로드 및 레이블 변환) ---
try:
    train_val_metadata_df = pd.read_csv(CSV_FILE_PATH)
    train_val_all_ids = train_val_metadata_df[FILENAME_COLUMN_IN_CSV].tolist()
    train_val_raw_labels = train_val_metadata_df[TARGET_COLUMN_IN_CSV].tolist()
except FileNotFoundError:
    print(f"오류: 로컬 학습 CSV 파일({CSV_FILE_PATH})을 찾을 수 없습니다.")
    exit()

label_to_int = {class_name: i for i, class_name in enumerate(CLASSES)}
train_val_all_labels = [label_to_int[str(label_str)] for label_str in train_val_raw_labels]
print(f"총 {len(train_val_all_ids)}개의 학습/검증 샘플 정보를 CSV 파일에서 로드 완료.")

print("\n[추론용] 전체 학습 데이터에 대한 정규화 통계치를 계산합니다...")
overall_log_mel_mean, overall_log_mel_std = calculate_feature_stats(train_val_all_ids, NPY_LOG_MEL_TRAIN_DIR_FOR_DATASET)
print(f"--- 계산 완료: Mean={overall_log_mel_mean:.4f}, Std={overall_log_mel_std:.4f} ---")

총 4000개의 학습/검증 샘플 정보를 CSV 파일에서 로드 완료.

[추론용] 전체 학습 데이터에 대한 정규화 통계치를 계산합니다...
'C:\\머신러닝\\kaggle2\\kaggle\train_log_mel'에서 통계치 계산 중 (4000개 파일)...
--- 계산 완료: Mean=-34.6940, Std=20.1266 ---


In [None]:
# 랜덤 시드 고정 함수
def set_seed(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(RANDOM_SEED)
skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_SEED)
X = np.array(train_val_all_ids)
y = np.array(train_val_all_labels)

for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
    print(f"\n{'='*25}\n   Fold {fold+1}/{N_SPLITS} 학습 시작\n{'='*25}")

    train_filenames, val_filenames = X[train_idx], X[val_idx]
    train_labels, val_labels = y[train_idx], y[val_idx]

    print(f"Fold {fold+1}의 학습 데이터로 통계치를 계산합니다...")
    # 통계치 계산은 증강되지 않은 원본 데이터로 수행해야 하므로 이 부분은 그대로 둡니다.
    fold_mean, fold_std = calculate_feature_stats(train_filenames, NPY_LOG_MEL_TRAIN_DIR_FOR_DATASET)
    

    train_dataset = LogMelAudioDataset(
        NPY_LOG_MEL_TRAIN_DIR_FOR_DATASET, train_filenames.tolist(), train_labels.tolist(),
        mean=fold_mean, std=fold_std
    )
    val_dataset = LogMelAudioDataset(
        NPY_LOG_MEL_TRAIN_DIR_FOR_DATASET, val_filenames.tolist(), val_labels.tolist(),
        mean=fold_mean, std=fold_std
    )
    
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
    
    model = SingleBranchLogMelCNN(NUM_CLASSES, INPUT_CHANNELS, LOG_MEL_HEIGHT, LOG_MEL_WIDTH, FC_HIDDEN_DIM_MODEL, DROPOUT_RATE_MODEL).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-5)
    current_best_model_path = os.path.join(MODEL_SAVE_DIR, f'logmel_skfold_model_fold_{fold}.pt')
    early_stopper = EarlyStopping(patience=EARLY_STOPPING_PATIENCE, min_delta=EARLY_STOPPING_MIN_DELTA, path=current_best_model_path)

    for epoch in range(NUM_EPOCHS):
        model.train()
        for log_mels, labels in train_loader:
            log_mels, labels = log_mels.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(log_mels)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        model.eval()
        with torch.no_grad():
            val_loss = 0
            for log_mels, labels in val_loader:
                log_mels, labels = log_mels.to(device), labels.to(device)
                outputs = model(log_mels)
                val_loss += criterion(outputs, labels).item() * log_mels.size(0)
        epoch_val_loss = val_loss / len(val_loader.dataset)
        if early_stopper.early_stop:
            print(f"Fold {fold+1}, Epoch {epoch+1}: Early stopping.")
            break
    print(f"Fold {fold+1} 학습 완료.")

print(f"\n{'='*20} Stratified K-Fold 학습 완료 {'='*20}")


   Fold 1/10 학습 시작
Fold 1의 학습 데이터로 통계치를 계산합니다...
'C:\\머신러닝\\kaggle2\\kaggle\train_log_mel'에서 통계치 계산 중 (3600개 파일)...
Fold 1 학습 완료.

   Fold 2/10 학습 시작
Fold 2의 학습 데이터로 통계치를 계산합니다...
'C:\\머신러닝\\kaggle2\\kaggle\train_log_mel'에서 통계치 계산 중 (3600개 파일)...
Fold 2 학습 완료.

   Fold 3/10 학습 시작
Fold 3의 학습 데이터로 통계치를 계산합니다...
'C:\\머신러닝\\kaggle2\\kaggle\train_log_mel'에서 통계치 계산 중 (3600개 파일)...
Fold 3 학습 완료.

   Fold 4/10 학습 시작
Fold 4의 학습 데이터로 통계치를 계산합니다...
'C:\\머신러닝\\kaggle2\\kaggle\train_log_mel'에서 통계치 계산 중 (3600개 파일)...
Fold 4 학습 완료.

   Fold 5/10 학습 시작
Fold 5의 학습 데이터로 통계치를 계산합니다...
'C:\\머신러닝\\kaggle2\\kaggle\train_log_mel'에서 통계치 계산 중 (3600개 파일)...
Fold 5 학습 완료.

   Fold 6/10 학습 시작
Fold 6의 학습 데이터로 통계치를 계산합니다...
'C:\\머신러닝\\kaggle2\\kaggle\train_log_mel'에서 통계치 계산 중 (3600개 파일)...
Fold 6 학습 완료.

   Fold 7/10 학습 시작
Fold 7의 학습 데이터로 통계치를 계산합니다...
'C:\\머신러닝\\kaggle2\\kaggle\train_log_mel'에서 통계치 계산 중 (3600개 파일)...
Fold 7 학습 완료.

   Fold 8/10 학습 시작
Fold 8의 학습 데이터로 통계치를 계산합니다...
'C:\\머신러닝\\kaggle2\\kaggle\train_

In [None]:
model_paths = glob.glob(os.path.join(MODEL_SAVE_DIR, 'logmel_skfold_model_fold_*.pt'))
if not model_paths:
    print("오류: 학습된 K-Fold 모델 파일이 없습니다.")
else:
    print(f"총 {len(model_paths)}개의 Fold 모델로 앙상블 추론을 시작합니다.")

    df_test = pd.read_csv(TEST_CSV_FILE_PATH)
    test_file_ids = df_test[FILENAME_COLUMN_IN_CSV].tolist()
    test_dataset = LogMelAudioDataset(
        npy_log_mel_base_dir=NPY_LOG_MEL_TEST_DIR_FOR_DATASET,
        file_names_list=test_file_ids,
        labels_list=None,
        mean=overall_log_mel_mean,
        std=overall_log_mel_std,   
        is_test=True
    )
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

    all_model_probs = []
    for path in model_paths:
        inference_model = SingleBranchLogMelCNN(NUM_CLASSES, INPUT_CHANNELS, LOG_MEL_HEIGHT, LOG_MEL_WIDTH, FC_HIDDEN_DIM_MODEL, DROPOUT_RATE_MODEL).to(device)
        inference_model.load_state_dict(torch.load(path, map_location=device))
        inference_model.eval()
        current_model_probs = []
        with torch.no_grad():
            for log_mels_test, _ in test_loader:
                log_mels_test = log_mels_test.to(device)
                logits = inference_model(log_mels_test)
                probs = F.softmax(logits, dim=1)
                current_model_probs.append(probs.cpu())
        all_model_probs.append(torch.cat(current_model_probs, dim=0))
    ensembled_probs_tensor = torch.stack(all_model_probs)
    avg_probs = ensembled_probs_tensor.mean(dim=0)
    _, final_preds_indices = torch.max(avg_probs, 1)

    idx_to_label = {v: k for k, v in label_to_int.items()}
    predicted_labels_str = [idx_to_label[p_idx] for p_idx in final_preds_indices.tolist()]
    current_time_str = datetime.now().strftime("%Y%m%d_%H%M%S")
    submission_filename = f'submission_logmel_skfold_ensemble_{current_time_str}.csv'
    submission_save_path = os.path.join(SUBMISSION_SAVE_DIR, submission_filename)
    submission_df = pd.DataFrame({'ID': test_file_ids, 'Target': predicted_labels_str})
    submission_df.to_csv(submission_save_path, index=False)
    print(f"\n앙상블 추론 완료.")

총 15개의 Fold 모델로 앙상블 추론을 시작합니다.


  inference_model.load_state_dict(torch.load(path, map_location=device))



앙상블 추론 완료.
