In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/unilm/beats

In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

In [None]:
!pip install audiomentations

In [None]:
import pandas as pd
import os
import torch
from torch.utils.data import Dataset, DataLoader
import librosa
import torch.nn.functional as F
import os
import shutil
import librosa
import numpy as np
from sklearn.model_selection import train_test_split
import pandas as pd
import soundfile as sf
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Gain
import torch
import torch.nn as nn
import torch.optim as optim
from BEATs import BEATs, BEATsConfig

In [None]:
# 증강 파이프라인 설정
augment = Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.5),
    TimeStretch(min_rate=0.8, max_rate=1.2, p=0.5),
    Gain(min_gain_in_db=-6, max_gain_in_db=6, p=0.5),
])

# 오디오 증강 함수
def augment_audio(audio, sr):
    return augment(samples=audio, sample_rate=sr)

# 폴더 경로 설정
normal_folder = "/content/drive/MyDrive/unilm/beats/alpha_data/alpha_AIhub_data/3.차량주행음"
abnormal_folder = "/content/drive/MyDrive/unilm/beats/alpha_data/alpha_beats_eval_data"
train_folder = "/content/drive/MyDrive/unilm/beats/alpha_data/train"
test_folder = "/content/drive/MyDrive/unilm/beats/alpha_data/test"

# 폴더 생성
os.makedirs(train_folder, exist_ok=True)
os.makedirs(test_folder, exist_ok=True)

# Normal 및 Abnormal 파일 수집
normal_files = [os.path.join(normal_folder, f) for f in os.listdir(normal_folder) if f.endswith('.wav')]
abnormal_files = [os.path.join(abnormal_folder, f) for f in os.listdir(abnormal_folder) if f.endswith('.wav')]

# Train/Test Split
normal_train, normal_test = train_test_split(normal_files, test_size=0.2, random_state=42)
abnormal_train, abnormal_test = train_test_split(abnormal_files, test_size=0.2, random_state=42)

# 원본 데이터 파일 이동
for f in normal_train + abnormal_train:
    shutil.copy(f, os.path.join(train_folder, os.path.basename(f)))

for f in normal_test + abnormal_test:
    shutil.copy(f, os.path.join(test_folder, os.path.basename(f)))

# 오디오 증강 및 저장
sr = 16000  # 샘플링 레이트
augmented_files = []
augmented_labels = []
for f in normal_train + abnormal_train:
    audio, _ = librosa.load(f, sr=sr)
    for i in range(3):  # 각 파일당 3개의 증강 데이터 생성
        augmented_audio = augment_audio(audio, sr)
        augmented_filename = f"aug_{os.path.basename(f).split('.')[0]}_{i}.wav"
        augmented_filepath = os.path.join(train_folder, augmented_filename)
        sf.write(augmented_filepath, augmented_audio, sr)  # soundfile로 파일 저장
        augmented_files.append(augmented_filename)
        augmented_labels.append(0 if f in normal_train else 1)

# 라벨 추가
train_labels = [0] * len(normal_train) + [1] * len(abnormal_train) + augmented_labels
test_labels = [0] * len(normal_test) + [1] * len(abnormal_test)

train_files = [os.path.basename(f) for f in normal_train + abnormal_train] + augmented_files
test_files = [os.path.basename(f) for f in normal_test + abnormal_test]

# DataFrame 생성
train_df = pd.DataFrame({"filename": train_files, "label": train_labels})
test_df = pd.DataFrame({"filename": test_files, "label": test_labels})

# CSV 저장
train_csv_path = "/content/drive/MyDrive/unilm/beats/alpha_data/train_data.csv"
test_csv_path = "/content/drive/MyDrive/unilm/beats/alpha_data/test_data.csv"

train_df.to_csv(train_csv_path, index=False)
test_df.to_csv(test_csv_path, index=False)

print(f"Train files: {len(train_files)}")
print(f"Test files: {len(test_files)}")
print(f"Train CSV saved at: {train_csv_path}")
print(f"Test CSV saved at: {test_csv_path}")

In [None]:
# Dataset 정의
class AudioDataset(Dataset):
    def __init__(self, csv_path, audio_folder):
        self.data = pd.read_csv(csv_path)
        self.audio_folder = audio_folder

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        filepath = os.path.join(self.audio_folder, row['filename'])
        label = torch.tensor(row['label'], dtype=torch.float32)

        # 오디오 로드
        audio, _ = librosa.load(filepath, sr=16000)
        audio = torch.tensor(audio, dtype=torch.float32)

        return audio, label

# Collate Function 정의 (패딩 처리)
def pad_collate_fn(batch):
    inputs, labels = zip(*batch)
    max_len = max(input.size(0) for input in inputs)
    inputs = [F.pad(input, (0, max_len - input.size(0))) for input in inputs]
    inputs = torch.stack(inputs, dim=0)
    labels = torch.tensor(labels, dtype=torch.float32)
    return inputs, labels

# Train/Test DataLoader 준비
train_csv = "/content/drive/MyDrive/unilm/beats/alpha_data/train_data.csv"
test_csv = "/content/drive/MyDrive/unilm/beats/alpha_data/test_data.csv"

train_folder = "/content/drive/MyDrive/unilm/beats/alpha_data/train"
test_folder = "/content/drive/MyDrive/unilm/beats/alpha_data/test"

train_dataset = AudioDataset(csv_path=train_csv, audio_folder=train_folder)
test_dataset = AudioDataset(csv_path=test_csv, audio_folder=test_folder)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=pad_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=True, collate_fn=pad_collate_fn)

print(f"Train set size: {len(train_loader)}")
print(f"Test set size: {len(test_loader)}")

### 기존 모델

In [None]:
from sklearn.metrics import roc_auc_score
import numpy as np
import torch

# AUC 및 PAUC 계산 함수
def calculate_auc_pauc(test_loader, model, pauc_fpr_range=(0.0, 0.1)):
    model.eval()
    all_probs = []
    all_labels = []

    # 모델에서 예측 확률과 실제 라벨 추출
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")
            # BEATs 모델에서 특징 추출
            features, _ = model.extract_features(inputs, skip_predictor=True)
            features = features.mean(dim=1)  # 시간축 평균
            probs = torch.sigmoid(features[:, 0])  # 첫 번째 차원을 확률로 사용
            all_probs.extend(probs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # NumPy 배열로 변환
    all_probs = np.array(all_probs)
    all_labels = np.array(all_labels)

    # 두 클래스가 포함되었는지 확인
    if len(np.unique(all_labels)) < 2:
        raise ValueError("Test set contains only one class. AUC/PAUC cannot be calculated.")

    # AUC 계산
    auc_value = roc_auc_score(all_labels, all_probs)

    # PAUC 계산 (FPR 0.0–0.1 구간)
    pauc_value = roc_auc_score(all_labels, all_probs, max_fpr=pauc_fpr_range[1])

    print(f"AUC (overall 0.0-1.0): {auc_value:.4f}")
    print(f"PAUC (FPR {pauc_fpr_range[0]}-{pauc_fpr_range[1]}): {pauc_value:.4f}")

    return auc_value, pauc_value

# 평가 실행
auc, pauc = calculate_auc_pauc(test_loader, base_model, pauc_fpr_range=(0.0, 0.1))

print(f"Final AUC: {auc:.4f}")
print(f"Final PAUC: {pauc:.4f}")

### weight loss 적용

In [None]:
# BEATs 기반 이진 분류기 정의 (Dropout 추가)
class BEATsBinaryClassifier(nn.Module):
    def __init__(self, base_model):
        super(BEATsBinaryClassifier, self).__init__()
        self.base_model = base_model
        self.dropout = nn.Dropout(0.1)  # 드롭아웃 확률 설정
        self.fc = nn.Linear(768, 1)  # BEATs 출력 크기: 768 -> 이진 분류 (1개 출력)

    def forward(self, x, padding_mask=None):
        with torch.no_grad():  # Feature extractor 고정
            features, _ = self.base_model.extract_features(x, padding_mask=padding_mask, skip_predictor=True)
        features = features.mean(dim=1)  # 시간 축 평균
        features = self.dropout(features)  # 드롭아웃 적용
        logits = self.fc(features)  # 이진 분류 로짓 출력
        return logits

# BEATs 모델 로드
checkpoint_path = '/content/drive/MyDrive/unilm/beats/BEATs_iter3.pt'
checkpoint = torch.load(checkpoint_path)

cfg = BEATsConfig({**checkpoint['cfg'], "finetuned_model": True})  # BEATs 설정
base_model = BEATs(cfg)
base_model.load_state_dict(checkpoint['model'], strict=False)
base_model.eval()

# 이진 분류 모델 초기화
binary_model = BEATsBinaryClassifier(base_model).to("cuda")

# 클래스별 가중치 계산
from collections import Counter

# AudioDataset 객체에서 라벨 추출
labels = [int(sample[1].item()) for sample in train_dataset]  # train_dataset의 모든 샘플에서 라벨만 추출
label_counts = Counter(labels)  # 라벨 분포 계산

# 가중치 계산
total_count = sum(label_counts.values())
class_weights = torch.tensor([total_count / label_counts[0], total_count / label_counts[1]], dtype=torch.float32).to("cuda")

print(f"Class weights: {class_weights}")

# Weighted BCEWithLogitsLoss 설정
criterion = nn.BCEWithLogitsLoss(pos_weight=class_weights[1])  # 양성 클래스에 대한 가중치 적용

# 옵티마이저 설정
optimizer = optim.Adam(binary_model.parameters(), lr=0.0001)

# 학습 루프
num_epochs = 10
early_stopping_patience = 5  # Early Stopping 기준 에포크 수
no_improvement_count = 0
best_val_loss = float('inf')

for epoch in range(num_epochs):
    binary_model.train()
    train_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to("cuda"), labels.to("cuda")
        optimizer.zero_grad()

        logits = binary_model(inputs)  # 모델 출력
        loss = criterion(logits.view(-1), labels)  # 손실 계산 (가중치 적용)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)
    print(f"Epoch {epoch + 1}, Train Loss: {train_loss:.4f}")

    # Validation
    binary_model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")
            logits = binary_model(inputs)
            probs = torch.sigmoid(logits.view(-1))
            preds = (probs > 0.3).float()
            loss = criterion(logits.view(-1), labels)
            val_loss += loss.item()
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    val_loss /= len(test_loader)
    accuracy = correct / total
    print(f"Validation Loss: {val_loss:.4f}, Accuracy: {accuracy:.4f}")

    # Early Stopping 검증 및 최적 모델 저장
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        no_improvement_count = 0  # 개선되었으므로 초기화
        # 최적 모델 저장
        torch.save({
            'model_state_dict': binary_model.state_dict(),  # 이진 분류 모델의 상태 저장
            'optimizer_state_dict': optimizer.state_dict(),  # 옵티마이저 상태 저장
            'epoch': epoch,  # 현재 에포크
            'best_val_loss': best_val_loss  # 최적 Validation Loss
        }, "/content/drive/MyDrive/PretrainedSED/sigmoid_binary_best.pt")  # 저장 경로
        print(f"Model saved with Validation Loss: {best_val_loss:.4f}")
    else:
        no_improvement_count += 1  # 개선되지 않으면 증가
        print(f"No improvement in Validation Loss for {no_improvement_count} epochs.")

    # Early Stopping 조건
    if no_improvement_count >= early_stopping_patience:
        print(f"Early stopping triggered after {epoch + 1} epochs.")
        break

# 학습 완료 후 저장된 모델로 AUC/PAUC 평가 가능

In [None]:
from sklearn.metrics import roc_auc_score

# AUC 및 PAUC 계산 함수
def calculate_auc_pauc(test_loader, model, pauc_fpr_range=(0.0, 0.1)):
    model.eval()
    all_probs = []
    all_labels = []

    # 모델에서 예측 확률과 실제 라벨 추출
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")
            logits = model(inputs)
            probs = torch.sigmoid(logits.squeeze())  # 확률 계산
            all_probs.extend(probs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # NumPy 배열로 변환
    all_probs = np.array(all_probs)
    all_labels = np.array(all_labels)

    # AUC 계산
    auc_value = roc_auc_score(all_labels, all_probs)

    # PAUC 계산 (FPR 0.0–0.1 구간)
    pauc_value = roc_auc_score(all_labels, all_probs, max_fpr=pauc_fpr_range[1])

    print(f"AUC (overall 0.0-1.0): {auc_value:.4f}")
    print(f"PAUC (FPR {pauc_fpr_range[0]}-{pauc_fpr_range[1]}): {pauc_value:.4f}")

    return auc_value, pauc_value

# 평가 실행
auc, pauc = calculate_auc_pauc(test_loader, binary_model, pauc_fpr_range=(0.0, 0.1))

### hard negative 사용

In [None]:
# BEATs 기반 이진 분류기 정의 (Dropout 추가)
class BEATsBinaryClassifier(nn.Module):
    def __init__(self, base_model):
        super(BEATsBinaryClassifier, self).__init__()
        self.base_model = base_model
        self.dropout = nn.Dropout(0.1)  # 드롭아웃 확률 설정
        self.fc = nn.Linear(768, 1)  # BEATs 출력 크기: 768 -> 이진 분류 (1개 출력)

    def forward(self, x, padding_mask=None):
        with torch.no_grad():  # Feature extractor 고정
            features, _ = self.base_model.extract_features(x, padding_mask=padding_mask, skip_predictor=True)
        features = features.mean(dim=1)  # 시간 축 평균
        features = self.dropout(features)  # 드롭아웃 적용
        logits = self.fc(features)  # 이진 분류 로짓 출력
        return logits

# Hard Negative Mining 함수
def hard_negative_mining(loader, model, threshold=0.5):
    model.eval()
    hard_negatives = []
    hard_labels = []

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")
            logits = model(inputs)
            probs = torch.sigmoid(logits.view(-1))
            preds = (probs > threshold).float()

            # 잘못 예측된 음성 데이터(음성인데 양성으로 분류됨)
            incorrect_indices = (preds == 1) & (labels == 0)
            for idx in torch.where(incorrect_indices)[0]:
                hard_negatives.append(inputs[idx].cpu())
                hard_labels.append(labels[idx].cpu())

    if len(hard_negatives) > 0:
        hard_negatives = torch.stack(hard_negatives)
        hard_labels = torch.stack(hard_labels)

    return hard_negatives, hard_labels

# BEATs 모델 로드
checkpoint_path = '/content/drive/MyDrive/unilm/beats/BEATs_iter3.pt'
checkpoint = torch.load(checkpoint_path)

cfg = BEATsConfig({**checkpoint['cfg'], "finetuned_model": True})  # BEATs 설정
base_model = BEATs(cfg)
base_model.load_state_dict(checkpoint['model'], strict=False)
base_model.eval()

# 이진 분류 모델 초기화
binary_model = BEATsBinaryClassifier(base_model).to("cuda")

# 손실 함수 및 옵티마이저 설정
criterion = nn.BCEWithLogitsLoss()  # 시그모이드를 위한 손실 함수
optimizer = optim.Adam(binary_model.parameters(), lr=0.0001)

# 학습 루프
num_epochs = 10
early_stopping_patience = 5  # Early Stopping 기준 에포크 수
no_improvement_count = 0
best_val_loss = float('inf')

for epoch in range(num_epochs):
    binary_model.train()
    train_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to("cuda"), labels.to("cuda")
        optimizer.zero_grad()

        logits = binary_model(inputs)  # 모델 출력
        loss = criterion(logits.view(-1), labels)  # 손실 계산
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)
    print(f"Epoch {epoch + 1}, Train Loss: {train_loss:.4f}")

    # Hard Negative Mining 실행
    hard_negatives, hard_labels = hard_negative_mining(train_loader, binary_model, threshold=0.5)
    if len(hard_negatives) > 0:
        print(f"Found {len(hard_negatives)} hard negative samples. Adding them to training.")
        # Hard Negative 샘플을 DataLoader에 추가
        hard_negatives = hard_negatives.to("cuda")
        hard_labels = hard_labels.to("cuda")
        optimizer.zero_grad()
        logits = binary_model(hard_negatives)
        loss = criterion(logits.view(-1), hard_labels)
        loss.backward()
        optimizer.step()

    # Validation
    binary_model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")
            logits = binary_model(inputs)
            probs = torch.sigmoid(logits.view(-1))
            preds = (probs > 0.3).float()
            loss = criterion(logits.view(-1), labels)
            val_loss += loss.item()
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    val_loss /= len(test_loader)
    accuracy = correct / total
    print(f"Validation Loss: {val_loss:.4f}, Accuracy: {accuracy:.4f}")

    # Early Stopping 검증 및 최적 모델 저장
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        no_improvement_count = 0  # 개선되었으므로 초기화
        # 최적 모델 저장
        torch.save({
            'model_state_dict': binary_model.state_dict(),  # 이진 분류 모델의 상태 저장
            'optimizer_state_dict': optimizer.state_dict(),  # 옵티마이저 상태 저장
            'epoch': epoch,  # 현재 에포크
            'best_val_loss': best_val_loss  # 최적 Validation Loss
        }, "/content/drive/MyDrive/PretrainedSED/sigmoid_binary_best.pt")  # 저장 경로
        print(f"Model saved with Validation Loss: {best_val_loss:.4f}")
    else:
        no_improvement_count += 1  # 개선되지 않으면 증가
        print(f"No improvement in Validation Loss for {no_improvement_count} epochs.")

    # Early Stopping 조건
    if no_improvement_count >= early_stopping_patience:
        print(f"Early stopping triggered after {epoch + 1} epochs.")
        break

# 학습 완료 후 저장된 모델로 AUC/PAUC 평가 가능

In [None]:
from sklearn.metrics import roc_auc_score

# AUC 및 PAUC 계산 함수
def calculate_auc_pauc(test_loader, model, pauc_fpr_range=(0.0, 0.1)):
    model.eval()
    all_probs = []
    all_labels = []

    # 모델에서 예측 확률과 실제 라벨 추출
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")
            logits = model(inputs)
            probs = torch.sigmoid(logits.squeeze())  # 확률 계산
            all_probs.extend(probs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # NumPy 배열로 변환
    all_probs = np.array(all_probs)
    all_labels = np.array(all_labels)

    # AUC 계산
    auc_value = roc_auc_score(all_labels, all_probs)

    # PAUC 계산 (FPR 0.0–0.1 구간)
    pauc_value = roc_auc_score(all_labels, all_probs, max_fpr=pauc_fpr_range[1])

    print(f"AUC (overall 0.0-1.0): {auc_value:.4f}")
    print(f"PAUC (FPR {pauc_fpr_range[0]}-{pauc_fpr_range[1]}): {pauc_value:.4f}")

    return auc_value, pauc_value

# 평가 실행
auc, pauc = calculate_auc_pauc(test_loader, binary_model, pauc_fpr_range=(0.0, 0.1))

### hard+weight

In [None]:
# BEATs 기반 이진 분류기 정의 (Dropout 추가)
class BEATsBinaryClassifier(nn.Module):
    def __init__(self, base_model):
        super(BEATsBinaryClassifier, self).__init__()
        self.base_model = base_model
        self.dropout = nn.Dropout(0.1)  # 드롭아웃 확률 설정
        self.fc = nn.Linear(768, 1)  # BEATs 출력 크기: 768 -> 이진 분류 (1개 출력)

    def forward(self, x, padding_mask=None):
        with torch.no_grad():  # Feature extractor 고정
            features, _ = self.base_model.extract_features(x, padding_mask=padding_mask, skip_predictor=True)
        features = features.mean(dim=1)  # 시간 축 평균
        features = self.dropout(features)  # 드롭아웃 적용
        logits = self.fc(features)  # 이진 분류 로짓 출력
        return logits

# Hard Negative Mining 함수 (패딩 추가)
def hard_negative_mining(loader, model, threshold=0.5):
    model.eval()
    hard_negatives = []
    hard_labels = []

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")
            logits = model(inputs)
            probs = torch.sigmoid(logits.view(-1))
            preds = (probs > threshold).float()

            # 잘못 예측된 음성 데이터(음성인데 양성으로 분류됨)
            incorrect_indices = (preds == 1) & (labels == 0)
            for idx in torch.where(incorrect_indices)[0]:
                hard_negatives.append(inputs[idx].cpu())
                hard_labels.append(labels[idx].cpu())

    if len(hard_negatives) > 0:
        # 텐서 크기 통일 (패딩 처리)
        max_len = max(neg.size(0) for neg in hard_negatives)  # 가장 긴 데이터 길이 확인
        hard_negatives = [F.pad(neg, (0, max_len - neg.size(0))) for neg in hard_negatives]
        hard_negatives = torch.stack(hard_negatives)  # 스택으로 변환
        hard_labels = torch.tensor(hard_labels, dtype=torch.float32)  # 라벨 변환

    return hard_negatives, hard_labels

# BEATs 모델 로드
checkpoint_path = '/content/drive/MyDrive/unilm/beats/BEATs_iter3.pt'
checkpoint = torch.load(checkpoint_path)

cfg = BEATsConfig({**checkpoint['cfg'], "finetuned_model": True})  # BEATs 설정
base_model = BEATs(cfg)
base_model.load_state_dict(checkpoint['model'], strict=False)
base_model.eval()

# 이진 분류 모델 초기화
binary_model = BEATsBinaryClassifier(base_model).to("cuda")

# 클래스별 가중치 계산
from collections import Counter

labels = [label.item() for _, label in train_loader.dataset]  # 전체 학습 데이터 라벨 추출
label_counts = Counter(labels)
total_count = sum(label_counts.values())
class_weights = torch.tensor([total_count / label_counts[0], total_count / label_counts[1]], dtype=torch.float32).to("cuda")

print(f"Class Weights: {class_weights}")

# Weighted BCEWithLogitsLoss 설정
criterion = nn.BCEWithLogitsLoss(pos_weight=class_weights[1])  # 양성 클래스 가중치 적용

# 옵티마이저 설정
optimizer = optim.Adam(binary_model.parameters(), lr=0.0001)

# 학습 루프
num_epochs = 5
early_stopping_patience = 5  # Early Stopping 기준 에포크 수
no_improvement_count = 0
best_val_loss = float('inf')

for epoch in range(num_epochs):
    binary_model.train()
    train_loss = 0.0

    # Train 데이터 학습
    for inputs, labels in train_loader:
        inputs, labels = inputs.to("cuda"), labels.to("cuda")
        optimizer.zero_grad()

        logits = binary_model(inputs)
        loss = criterion(logits.view(-1), labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)
    print(f"Epoch {epoch + 1}, Train Loss: {train_loss:.4f}")

    # Hard Negative Mining 실행
    hard_negatives, hard_labels = hard_negative_mining(train_loader, binary_model, threshold=0.5)
    if len(hard_negatives) > 0:
        # 위의 수정된 배치 처리 + Mixed Precision Training 코드 삽입
        scaler = torch.cuda.amp.GradScaler()
        batch_size = 16
        for i in range(0, len(hard_negatives), batch_size):
            batch_negatives = hard_negatives[i:i + batch_size].to("cuda")
            batch_labels = hard_labels[i:i + batch_size].to("cuda")

            optimizer.zero_grad()
            with torch.cuda.amp.autocast():
                logits = binary_model(batch_negatives)
                loss = criterion(logits.view(-1), batch_labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

    # Validation
    binary_model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")
            logits = binary_model(inputs)
            probs = torch.sigmoid(logits.view(-1))
            preds = (probs > 0.5).float()
            loss = criterion(logits.view(-1), labels)
            val_loss += loss.item()
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    val_loss /= len(test_loader)
    accuracy = correct / total
    print(f"Validation Loss: {val_loss:.4f}, Accuracy: {accuracy:.4f}")

    # Early Stopping 검증 및 최적 모델 저장
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        no_improvement_count = 0  # 개선되었으므로 초기화
        # 최적 모델 저장
        torch.save({
            'model_state_dict': binary_model.state_dict(),  # 이진 분류 모델의 상태 저장
            'optimizer_state_dict': optimizer.state_dict(),  # 옵티마이저 상태 저장
            'epoch': epoch,  # 현재 에포크
            'best_val_loss': best_val_loss  # 최적 Validation Loss
        }, "/content/drive/MyDrive/PretrainedSED/sigmoid_binary_best.pt")  # 저장 경로
        print(f"Model saved with Validation Loss: {best_val_loss:.4f}")
    else:
        no_improvement_count += 1  # 개선되지 않으면 증가
        print(f"No improvement in Validation Loss for {no_improvement_count} epochs.")

    # Early Stopping 조건
    if no_improvement_count >= early_stopping_patience:
        print(f"Early stopping triggered after {epoch + 1} epochs.")
        break

# 학습 완료 후 저장된 모델로 AUC/PAUC 평가 가능

In [None]:
from sklearn.metrics import roc_auc_score

# AUC 및 PAUC 계산 함수
def calculate_auc_pauc(test_loader, model, pauc_fpr_range=(0.0, 0.1)):
    model.eval()
    all_probs = []
    all_labels = []

    # 모델에서 예측 확률과 실제 라벨 추출
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")
            logits = model(inputs)
            probs = torch.sigmoid(logits.squeeze())  # 확률 계산
            all_probs.extend(probs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # NumPy 배열로 변환
    all_probs = np.array(all_probs)
    all_labels = np.array(all_labels)

    # AUC 계산
    auc_value = roc_auc_score(all_labels, all_probs)

    # PAUC 계산 (FPR 0.0–0.1 구간)
    pauc_value = roc_auc_score(all_labels, all_probs, max_fpr=pauc_fpr_range[1])

    print(f"AUC (overall 0.0-1.0): {auc_value:.4f}")
    print(f"PAUC (FPR {pauc_fpr_range[0]}-{pauc_fpr_range[1]}): {pauc_value:.4f}")

    return auc_value, pauc_value

# 평가 실행
auc, pauc = calculate_auc_pauc(test_loader, binary_model, pauc_fpr_range=(0.0, 0.1))