In [137]:
import torch
import torch.nn as nn
from transformers import Wav2Vec2Processor, Wav2Vec2Model
from torch.utils.data import Dataset, DataLoader
import torchaudio
import pandas as pd
import numpy as np
import random
import os
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import librosa

In [138]:
# Config 클래스 정의
class Config:
    SR = 32000
    TARGET_SR = 16000
    N_MFCC = 40
    # Dataset
    ROOT_FOLDER = './'
    # Training
    N_CLASSES = 2
    BATCH_SIZE = 32
    N_EPOCHS = 5
    LR = 3e-4
    # Others
    SEED = 42

CONFIG = Config()

In [139]:
# Seed 설정
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(CONFIG.SEED)  # Seed 고정

In [140]:
# 사용자 정의 모델 클래스
class CustomWav2Vec2Classifier(nn.Module):
    def __init__(self, model_name="facebook/wav2vec2-base", num_labels=2):
        super(CustomWav2Vec2Classifier, self).__init__()
        self.wav2vec2 = Wav2Vec2Model.from_pretrained(model_name)
        self.classifier = nn.Linear(self.wav2vec2.config.hidden_size, num_labels)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, input_values, attention_mask=None):
        outputs = self.wav2vec2(input_values, attention_mask=attention_mask)
        hidden_states = outputs.last_hidden_state
        logits = self.classifier(hidden_states[:, 0, :])  # [CLS] 토큰에 해당하는 hidden state 사용
        return self.softmax(logits)

In [141]:
# 데이터셋 클래스 정의
class AudioDataset(Dataset):
    def __init__(self, csv_file, root_dir, processor, target_sr):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.processor = processor
        self.target_sr = target_sr

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        audio_path = f"{self.root_dir}{self.annotations.iloc[idx, 1]}"
        label = self.annotations.iloc[idx, 2]
        if not os.path.exists(audio_path):
            raise FileNotFoundError(f"File {audio_path} not found")
        try:
            waveform, sample_rate = torchaudio.load(audio_path)
            waveform = waveform.numpy().squeeze()
            waveform = librosa.resample(waveform, orig_sr=CONFIG.SR, target_sr=self.target_sr)
            waveform = torch.tensor(waveform).unsqueeze(0)
        except Exception as e:
            raise Exception(f"Error loading {audio_path}: {e}")
        inputs = self.processor(waveform, sampling_rate=self.target_sr, return_tensors="pt", padding=True)
        inputs = {key: torch.squeeze(value) for key, value in inputs.items()}
        label = 1 if label == 'real' else 0
        return inputs, label

In [142]:
# 데이터 로드
df = pd.read_csv('./train.csv')
test_df = pd.read_csv('./test.csv')

# train/validation 데이터 분할
train_df, val_df = train_test_split(df, test_size=0.2, random_state=CONFIG.SEED, stratify=df['label'])


In [143]:

# 모델과 프로세서 로드
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
model = CustomWav2Vec2Classifier(model_name="facebook/wav2vec2-base", num_labels=CONFIG.N_CLASSES)


Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [144]:
def collate_fn(batch):
    input_values = [item[0]['input_values'] for item in batch]
    attention_masks = [item[0].get('attention_mask') for item in batch]
    labels = [item[1] for item in batch]
    input_values = torch.nn.utils.rnn.pad_sequence(input_values, batch_first=True)
    if attention_masks[0] is not None:
        attention_masks = torch.nn.utils.rnn.pad_sequence(attention_masks, batch_first=True)
    else:
        attention_masks = None
    labels = torch.tensor(labels)
    return {'input_values': input_values, 'attention_mask': attention_masks}, labels


In [145]:
# 데이터셋 및 데이터로더 생성
train_dataset = AudioDataset(csv_file='train.csv', root_dir='', processor=processor, target_sr=CONFIG.TARGET_SR)
val_dataset = AudioDataset(csv_file='train.csv', root_dir='', processor=processor, target_sr=CONFIG.TARGET_SR)

train_loader = DataLoader(train_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=False,collate_fn=collate_fn)


In [146]:
# 모델 학습
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=CONFIG.LR)
criterion = nn.CrossEntropyLoss()
model.train()

for epoch in range(CONFIG.N_EPOCHS):
    total_loss = 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}")
    for inputs, labels in progress_bar:
        inputs = {key: value.to(device) if value is not None else None for key, value in inputs.items()}
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs['input_values'], attention_mask=inputs.get('attention_mask'))
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        progress_bar.set_postfix(loss=total_loss / len(train_loader))
    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader)}")

Epoch 1:   5%|▌         | 93/1733 [2:08:39<37:48:40, 83.00s/it, loss=0.0375] 


KeyboardInterrupt: 

In [None]:
# 예측 함수 정의
def predict(file_path):
    model.eval()
    with torch.no_grad():
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File {file_path} not found")
        try:
            waveform, sample_rate = torchaudio.load(file_path)
            waveform = waveform.numpy().squeeze()
            waveform = librosa.resample(waveform, orig_sr=CONFIG.SR, target_sr=CONFIG.TARGET_SR)
            waveform = torch.tensor(waveform).unsqueeze(0)
        except Exception as e:
            raise Exception(f"Error loading {file_path}: {e}")
        inputs = processor(waveform, sampling_rate=CONFIG.TARGET_SR, return_tensors="pt", padding=True)
        inputs = {key: torch.squeeze(value).to(device) for key, value in inputs.items()}
        outputs = model(inputs['input_values'], attention_mask=inputs.get('attention_mask'))
        probabilities = outputs.cpu().numpy()
        return probabilities

In [None]:
# 모델 예측 및 제출 파일 생성
predictions = []
progress_bar = tqdm(test_df.iterrows(), total=len(test_df), desc="Predicting")
for index, row in progress_bar:
    file_path = f"/Users/songseungju/Downloads/open/test/{row['path']}"
    probs = predict(file_path)
    predictions.append([row['id'], probs[0][1], probs[0][0]])

# 예측 결과를 baseline_submit.csv 형식으로 저장
submission_df = pd.DataFrame(predictions, columns=['id', 'fake', 'real'])
submission_df.to_csv('baseline_submit.csv', index=False)