In [7]:
import numpy as np
import pandas as pd
import os
import librosa
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from imblearn.over_sampling import RandomOverSampler
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import random
import warnings

warnings.filterwarnings('ignore')

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

class Config:
    SR = 32000
    N_MFCC = 13
    ROOT_FOLDER = './'
    N_CLASSES = 2
    BATCH_SIZE = 96
    N_EPOCHS = 5
    LR = 3e-4
    SEED = 42

CONFIG = Config()

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CONFIG.SEED)

# 데이터 로드 및 전처리
df = pd.read_csv('./train.csv')

def label_encoder(column):
    le = LabelEncoder().fit(column)
    print(column.name, le.classes_)
    return le.transform(column)

df['class'] = label_encoder(df['label'])

# MFCC 특징 추출
def get_mfcc_feature(df, train_mode=True):
    features = []
    labels = []
    for _, row in tqdm(df.iterrows()):
        y, sr = librosa.load(row['path'], sr=CONFIG.SR)
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=CONFIG.N_MFCC)
        mfcc = np.mean(mfcc.T, axis=0)
        features.append(mfcc)

        if train_mode:
            label = row['class']
            labels.append(label)

    return features, labels

features, labels = get_mfcc_feature(df, True)
feature_df = pd.DataFrame({'features': features, 'class': labels})

X = np.array(feature_df['features'].tolist())
y = np.array(feature_df['class'].tolist())

# 데이터 불균형 해결
ros = RandomOverSampler(random_state=CONFIG.SEED)
X_resampled, y_resampled = ros.fit_resample(X, y)
y_resampled = torch.tensor(y_resampled).long()  # 정수형으로 변환
y_resampled = torch.nn.functional.one_hot(y_resampled, num_classes=CONFIG.N_CLASSES).float()

# 데이터 분할
X_train, X_val, y_train, y_val = train_test_split(X_resampled, y_resampled, test_size=0.2, random_state=CONFIG.SEED)

# PyTorch Dataset 클래스 정의
class CustomDataset(Dataset):
    def __init__(self, mfcc, label):
        self.mfcc = mfcc
        self.label = label

    def __len__(self):
        return len(self.mfcc)

    def __getitem__(self, index):
        return self.mfcc[index], self.label[index]

train_dataset = CustomDataset(X_train, y_train)
val_dataset = CustomDataset(X_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=False)

# MLP 모델 정의
class MLP(nn.Module):
    def __init__(self, input_dim=CONFIG.N_MFCC, hidden_dims=[128, 256, 128], output_dim=CONFIG.N_CLASSES):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dims[0])
        self.fc2 = nn.Linear(hidden_dims[0], hidden_dims[1])
        self.fc3 = nn.Linear(hidden_dims[1], hidden_dims[2])
        self.fc4 = nn.Linear(hidden_dims[2], output_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.relu(self.fc3(x))
        x = self.dropout(x)
        x = self.fc4(x)
        return torch.sigmoid(x)

# 모델 학습 함수 정의
from sklearn.metrics import roc_auc_score

def train(model, optimizer, train_loader, val_loader, device):
    model.to(device)
    criterion = nn.BCELoss().to(device)
    
    best_val_score = 0
    best_model = None
    
    for epoch in range(1, CONFIG.N_EPOCHS+1):
        model.train()
        train_loss = []
        for features, labels in tqdm(iter(train_loader)):
            features = features.float().to(device)
            labels = labels.float().to(device)
            
            optimizer.zero_grad()
            
            output = model(features)
            loss = criterion(output, labels)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
                    
        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val AUC : [{_val_score:.5f}]')
            
        if best_val_score < _val_score:
            best_val_score = _val_score
            best_model = model
    
    return best_model

def multiLabel_AUC(y_true, y_scores):
    auc_scores = []
    for i in range(y_true.shape[1]):
        auc = roc_auc_score(y_true[:, i], y_scores[:, i])
        auc_scores.append(auc)
    mean_auc_score = np.mean(auc_scores)
    return mean_auc_score
    
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss, all_labels, all_probs = [], [], []
    
    with torch.no_grad():
        for features, labels in tqdm(iter(val_loader)):
            features = features.float().to(device)
            labels = labels.float().to(device)
            
            probs = model(features)
            
            loss = criterion(probs, labels)

            val_loss.append(loss.item())

            all_labels.append(labels.cpu().numpy())
            all_probs.append(probs.cpu().numpy())
        
        _val_loss = np.mean(val_loss)

        all_labels = np.concatenate(all_labels, axis=0)
        all_probs = np.concatenate(all_probs, axis=0)
        
        auc_score = multiLabel_AUC(all_labels, all_probs)
    
    return _val_loss, auc_score

model = MLP()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CONFIG.LR)

infer_model = train(model, optimizer, train_loader, val_loader, device)

# 테스트 데이터 예측
test = pd.read_csv('./test.csv')
test_features, _ = get_mfcc_feature(test, False)
test_features = np.array(test_features)
test_dataset = CustomDataset(test_features, torch.zeros((len(test_features), CONFIG.N_CLASSES)))
test_loader = DataLoader(test_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=False)

def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    predictions = []
    with torch.no_grad():
        for features, _ in tqdm(iter(test_loader)):
            features = features.float().to(device)
            
            probs = model(features)

            probs  = probs.cpu().detach().numpy()
            predictions += probs.tolist()
    return predictions

preds = inference(infer_model, test_loader, device)

submit = pd.read_csv('./sample_submission.csv')
submit.iloc[:, 1:] = preds
submit.head()

submit.to_csv('./sg3.csv', index=False)


label ['fake' 'real']


55438it [15:58, 57.84it/s]
100%|██████████| 464/464 [00:05<00:00, 80.91it/s]
100%|██████████| 116/116 [00:00<00:00, 256.95it/s]


Epoch [1], Train Loss : [0.94270] Val Loss : [0.67075] Val AUC : [0.73635]


100%|██████████| 464/464 [00:05<00:00, 79.37it/s]
100%|██████████| 116/116 [00:00<00:00, 318.59it/s]


Epoch [2], Train Loss : [0.65205] Val Loss : [0.59030] Val AUC : [0.78640]


100%|██████████| 464/464 [00:06<00:00, 77.29it/s]
100%|██████████| 116/116 [00:00<00:00, 266.15it/s]


Epoch [3], Train Loss : [0.59321] Val Loss : [0.51249] Val AUC : [0.84822]


100%|██████████| 464/464 [00:06<00:00, 74.24it/s]
100%|██████████| 116/116 [00:00<00:00, 265.95it/s]


Epoch [4], Train Loss : [0.52001] Val Loss : [0.42587] Val AUC : [0.90458]


100%|██████████| 464/464 [00:06<00:00, 75.70it/s]
100%|██████████| 116/116 [00:00<00:00, 198.67it/s]


Epoch [5], Train Loss : [0.45694] Val Loss : [0.38388] Val AUC : [0.92793]


50000it [22:39, 36.79it/s]
100%|██████████| 521/521 [00:01<00:00, 379.69it/s]
