In [2]:
!pip install kaggle --upgrade # Kaggle API를 최신 버전으로 설치
from google.colab import files
files.upload() # Kaggle API 토큰 파일(kaggle.json)을 업로드



Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"leesanghyuck","key":"314b0a5afbee46ea358eb30291f433fd"}'}

In [3]:
!mkdir -p ~/.kaggle # Kaggle 폴더를 생성
!cp kaggle.json ~/.kaggle/ # 업로드한 kaggle.json 파일을 해당 폴더로 복사
!chmod 600 ~/.kaggle/kaggle.json # 권한을 설정
!ls -1ha kaggle.json # 파일이 제대로 폴더에 있는지 확인(존재하면 kaggle.json 파일의 이름을 출력)

kaggle.json


In [4]:
! kaggle datasets download -d awsaf49/asvpoof-2019-dataset

Dataset URL: https://www.kaggle.com/datasets/awsaf49/asvpoof-2019-dataset
License(s): ODC Attribution License (ODC-By)
Downloading asvpoof-2019-dataset.zip to /content
100% 23.6G/23.6G [04:41<00:00, 36.9MB/s]
100% 23.6G/23.6G [04:41<00:00, 89.9MB/s]


In [5]:
!unzip asvpoof-2019-dataset.zip

[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049006.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049007.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049008.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049009.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049010.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049011.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049012.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049013.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049014.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049015.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049016.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049017.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049018.flac  
  inflating: PA/PA/ASVspoof2019_PA_train/flac/PA_T_0049019.flac  
  inflating: PA/PA/ASVspoo

In [6]:
!pip install resampy
import numpy as np
import pandas as pd
import os
import torch
import librosa
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm # Jupyter 노트북에서 진행 상황을 보여주는 라이브러리
import IPython # 오디오 파일 재생을 위한 라이브러리
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import LabelEncoder
from imblearn.over_sampling import RandomOverSampler
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Reshape, MaxPooling2D, Dropout, Conv2D, Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

Collecting resampy
  Downloading resampy-0.4.3-py3-none-any.whl (3.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: resampy
Successfully installed resampy-0.4.3


In [7]:
def readtxtfile(path):
    with open(path, 'r') as file:
        return file.read().splitlines()

def getlabels(path):
    text = readtxtfile(path)
    return {item.split(' ')[1]: item.split(' ')[-1] for item in text}

train_audio_files_path = 'LA/LA/ASVspoof2019_LA_train/flac'
train_labels_path = 'LA/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt'
filename2label = getlabels(train_labels_path)

In [8]:
data = []
labels = []

for filename, label in tqdm(filename2label.items()):
    filepath = os.path.join(train_audio_files_path, filename + '.flac')
    audio, sample_rate = librosa.load(filepath, sr=None)
    mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=40)
    mfccs_mean = np.mean(mfccs.T, axis=0)
    data.append(mfccs_mean)
    labels.append(label)

  0%|          | 0/25380 [00:00<?, ?it/s]

In [9]:
class ASVSpoof(torch.utils.data.Dataset):
    def __init__(self, audio_dir_path, num_samples, filename2label, transforms):
        super().__init__()
        self.audio_dir_path = audio_dir_path
        self.num_samples = num_samples
        self.audio_file_names = self.get_audio_file_names(filename2label)
        self.labels, self.label2id, self.id2label = self.get_labels(filename2label)
        self.transforms = transforms

    def __getitem__(self, index):
        signal, sr = torchaudio.load(os.path.join(self.audio_dir_path, self.audio_file_names[index]))
        signal = self.mix_down_if_necessary(signal)
        signal = self.cut_if_necessary(signal)
        signal = self.right_pad_if_necessary(signal)
        signal = self.transforms(signal)
        label = (self.labels[index])
        return signal, label

    def __len__(self):
        return len(self.labels)

    def get_audio_file_names(self, filename2label):
        audio_file_names = list(filename2label.keys())
        audio_file_names = [name + '.flac' for name in audio_file_names] # adding extension
        return audio_file_names

    def get_labels(self, filename2label):
        labels = list(filename2label.values())
        id2label = {idx : label for idx, label in  enumerate(list(set(labels)))}
        label2id = {label : idx for idx, label in  enumerate(list(set(labels)))}
        labels = [label2id[label] for label in labels]
        return labels, label2id, id2label

    def mix_down_if_necessary(self, signal): #converting from stereo to mono
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim = 0, keepdims = True)
        return signal

    def cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :num_samples]
        return signal

    def right_pad_if_necessary(self, signal):
        length = signal.shape[1]
        if self.num_samples > length:
            pad_last_dim = (0, num_samples - length)
            signal = torch.nn.functional.pad(signal, pad_last_dim)
        return signal

In [10]:
import numpy as np
import torch
from torch import nn
import torchaudio
import os
from tqdm.auto import tqdm

import matplotlib.pyplot as plt
import math
!pip install timm
from sklearn.metrics import roc_auc_score, roc_curve, RocCurveDisplay, f1_score, classification_report, ConfusionMatrixDisplay

Collecting timm
  Downloading timm-1.0.3-py3-none-any.whl (2.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch->timm)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch->timm)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch->timm)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch->timm)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch->timm)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch->timm)
  Using cache

In [11]:
val_audio_files_path = 'LA/LA/ASVspoof2019_LA_dev/flac'
val_labels_path = 'LA/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.dev.trl.txt'
val_filename2label = getlabels(val_labels_path)
test_audio_files_path = 'LA/LA/ASVspoof2019_LA_eval/flac'
test_labels_path = 'LA/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.eval.trl.txt'
test_filename2label = getlabels(test_labels_path)

In [18]:
mel_spectogram = torchaudio.transforms.MelSpectrogram(
    sample_rate = 16000,
    n_fft = 1024,
    hop_length = 512,
    n_mels = 64
)
num_samples = 6 * 16000
train_dataset = ASVSpoof(train_audio_files_path, num_samples, filename2label, mel_spectogram)
val_dataset = ASVSpoof(val_audio_files_path, num_samples, val_filename2label, mel_spectogram)
test_dataset = ASVSpoof(test_audio_files_path, num_samples, test_filename2label, mel_spectogram)

In [19]:
import timm

In [20]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = timm.create_model('resnet200d.ra2_in1k', pretrained = True, in_chans = 1)
        for i,(name, param) in enumerate(list(self.model.named_parameters())\
                                             [0:39]):
            param.requires_grad = False

        self.features = nn.Sequential(*list(self.model.children())[:-2])

        self.custom_layers = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(self.model.num_features, 1),
            nn.Sigmoid()
        )

    def forward(self, inputs):
        x = self.features(inputs)
        x = self.custom_layers(x)
        return x

In [None]:
class SimpleANN(nn.Module):
    def __init__(self):
        super(SimpleANN, self).__init__()
        self.flatten = nn.Flatten()
        # 입력 데이터의 총 크기: 1*64*188 = 12032
        self.linear1 = nn.Linear(12032, 1024)  # 첫 번째 선형 레이어
        self.relu = nn.ReLU()  # 활성화 함수
        self.linear2 = nn.Linear(1024, 512)  # 두 번째 선형 레이어
        self.linear3 = nn.Linear(512, 256)  # 세 번째 선형 레이어
        self.linear4 = nn.Linear(256, 1)  # 출력 레이어
        self.sigmoid = nn.Sigmoid()  # 출력 활성화 함수

    def forward(self, x):
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
        x = self.relu(x)
        x = self.linear4(x)
        x = self.sigmoid(x)  # 이진 분류를 위한 시그모이드 함수
        return x

In [None]:
class SimpleANN(nn.Module):
    def __init__(self):
        super(SimpleANN, self).__init__()
        self.flatten = nn.Flatten()
        # 입력 데이터의 총 크기: 1 * 13 * 188 = 2444
        self.linear1 = nn.Linear(1 * 13 * 188, 1024)  # 첫 번째 선형 레이어
        self.relu = nn.ReLU()  # 활성화 함수
        self.linear2 = nn.Linear(1024, 512)  # 두 번째 선형 레이어
        self.linear3 = nn.Linear(512, 256)  # 세 번째 선형 레이어
        self.linear4 = nn.Linear(256, 1)  # 출력 레이어
        self.sigmoid = nn.Sigmoid()  # 출력 활성화 함수

    def forward(self, x):
        x = self.flatten(x)  # 텐서를 평탄화
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
        x = self.relu(x)
        x = self.linear4(x)
        x = self.sigmoid(x)  # 이진 분류를 위한 시그모이드 함수
        return x


In [None]:
class Model1(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = timm.create_model('resnet101', pretrained = True, in_chans = 1)
        for i,(name, param) in enumerate(list(self.model.named_parameters())
                                             [0:39]):
            param.requires_grad = False

        self.features = nn.Sequential(*list(self.model.children())[:-2])

        self.custom_layers = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(self.model.num_features, 1),
            nn.Sigmoid()
        )

    def forward(self, inputs):
        x = self.features(inputs)
        x = self.custom_layers(x)
        return x

In [21]:
device = ('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
num_epochs = 12
criterion = nn.BCELoss()
model = Model().to(device)
optimizer = torch.optim.Adam(model.parameters())

cuda


In [22]:
train_loader = torch.utils.data.DataLoader(train_dataset, shuffle = True, batch_size = 32)
val_loader = torch.utils.data.DataLoader(val_dataset, shuffle = True, batch_size = 1024)
test_loader = torch.utils.data.DataLoader(test_dataset, shuffle = True, batch_size = 1024)
t_steps = len(train_loader)
v_steps = len(val_loader)
ts_steps = len(test_loader)

In [23]:
def EER(labels, outputs):
    fpr, tpr, threshold = roc_curve(labels, outputs, pos_label=1)
    fnr = 1 - tpr
    eer_threshold = threshold[np.nanargmin(np.absolute((fnr - fpr)))]
    eer_threshold
    eer = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
    return eer

In [None]:
# Training loop
num_epochs = 10
train_losses = []
val_losses = []
torch.cuda.empty_cache()
for epoch in range(num_epochs):
    y_true = []
    y_pred = []
    train_loss = 0.0
    loop = tqdm(enumerate(train_loader), total = len(train_loader))
    for batch_idx, (images, labels) in loop:
        loop.set_description(f'Epoch {epoch + 1} / {num_epochs} ')
#         forward pass
        model.train()
        torch.cuda.empty_cache()
        images = images.to(device)
        labels = labels.to(device)
        labels = labels.to(device).reshape(-1, 1)
        labels = labels.type(torch.cuda.FloatTensor)

        optimizer.zero_grad()

        outputs = model(images)
        y_true.append(labels.detach().cpu().numpy())
        y_pred.append(outputs.detach().cpu().numpy())

        loss = criterion(outputs, labels)
        train_loss += loss.item()
#         backward pass
        loss.backward()
        optimizer.step()

        loop.set_postfix(Training_loss = loss.item())

    y_true = np.concatenate(y_true)
    y_pred = np.concatenate(y_pred)
    train_eer = EER(y_true, y_pred)

#   validation every epoch
    y_true = []
    y_pred = []
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        val_loop = tqdm(enumerate(val_loader), total = len(val_loader))
        for val_batch_idx, (val_images, val_labels) in val_loop:
            torch.cuda.empty_cache()
            val_images = val_images.to(device)
            val_labels = val_labels.to(device)
            val_labels = val_labels.to(device).reshape(-1, 1)
            val_labels = val_labels.type(torch.cuda.FloatTensor) #use torch.FloatTensor if on cpu


            val_outputs = model(val_images)
            y_true.append(val_labels.detach().cpu().numpy())
            y_pred.append(val_outputs.detach().cpu().numpy())
            curr_val_loss = criterion(val_outputs, val_labels)
            val_loss += curr_val_loss.item()
            val_loop.set_postfix(validation_loss = curr_val_loss.item())

    train_loss_after_epoch = train_loss / t_steps
    val_loss_after_epoch = val_loss / v_steps
    train_losses.append(train_loss_after_epoch)
    val_losses.append(val_loss_after_epoch)
    y_true = np.concatenate(y_true)
    y_pred = np.concatenate(y_pred)
    val_eer = EER(y_true, y_pred)
    print(f'Epoch : {epoch + 1} Training loss : {train_loss_after_epoch} Train EER : {train_eer} Validation loss : {val_loss_after_epoch}  Val EER : {val_eer}')

  0%|          | 0/794 [00:00<?, ?it/s]

  0%|          | 0/25 [00:00<?, ?it/s]

Epoch : 1 Training loss : 0.12864567750070235 Train EER : 0.1003875968992248 Validation loss : 0.12271040469408036  Val EER : 0.0773155416012559


  0%|          | 0/794 [00:00<?, ?it/s]

  0%|          | 0/25 [00:00<?, ?it/s]

Epoch : 2 Training loss : 0.051981618293395065 Train EER : 0.03178294573643411 Validation loss : 0.09868477076292038  Val EER : 0.054945054945054944


  0%|          | 0/794 [00:00<?, ?it/s]

  0%|          | 0/25 [00:00<?, ?it/s]

Epoch : 3 Training loss : 0.03407076204560553 Train EER : 0.02131782945736434 Validation loss : 0.05574857696890831  Val EER : 0.034929356357927786


  0%|          | 0/794 [00:00<?, ?it/s]

  0%|          | 0/25 [00:00<?, ?it/s]

Epoch : 4 Training loss : 0.031974596451985256 Train EER : 0.020155038759689922 Validation loss : 0.09447519809007644  Val EER : 0.047880690737833596


  0%|          | 0/794 [00:00<?, ?it/s]

  0%|          | 0/25 [00:00<?, ?it/s]

Epoch : 5 Training loss : 0.019939987593484684 Train EER : 0.012015503875968992 Validation loss : 0.10678463399410248  Val EER : 0.054552590266875985


  0%|          | 0/794 [00:00<?, ?it/s]

In [None]:
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.show()

In [None]:
torch.save(model.state_dict(), "Resnet".pt')

In [None]:
new_outputs = []
new_labels = []
model.eval()
test_loss = 0.0
with torch.no_grad():
    test_loop = tqdm(enumerate(test_loader), total = len(test_loader))
    for test_batch_idx, (test_images, test_labels) in test_loop:
        torch.cuda.empty_cache()
        test_images = test_images.to(device)
        test_labels = test_labels.to(device)
        test_labels = test_labels.to(device).reshape(-1, 1)
        test_labels = test_labels.type(torch.cuda.FloatTensor) #use torch.FloatTensor if on cpu


        test_outputs = model(test_images)
        new_outputs.append(test_outputs.cpu().numpy())
        new_labels.append(test_labels.cpu().numpy())
        curr_test_loss = criterion(test_outputs, test_labels)
        test_loss += curr_test_loss.item()
        test_loop.set_postfix(test_loss = curr_test_loss.item())

In [None]:
labels = np.concatenate(new_labels)
outputs = np.concatenate(new_outputs)
print(labels.shape, outputs.shape)

In [None]:
score = roc_auc_score(labels, outputs)
score

In [None]:
RocCurveDisplay.from_predictions(labels, outputs)

In [None]:
def convert_into_whole(outputs):
    new_output = []
    for o in outputs:
        if o > 0.5:
            new_output.append(1)
        else:
            new_output.append(0)
    return new_output

new_outputs = convert_into_whole(outputs)
new_outputs = np.array(new_outputs)

In [None]:
print(classification_report(labels, new_outputs))
ConfusionMatrixDisplay.from_predictions(labels, new_outputs)

In [None]:
EER(labels, new_outputs)

In [None]:
import torchaudio
import torch

# 오디오 파일 경로 설정
audio_file_path = '테스트할_오디오_파일.wav'

# 오디오 파일 로드
waveform, sample_rate = torchaudio.load(audio_file_path)

# MFCC 특징 추출
mfcc_features = compute_mfcc(waveform.numpy(), sample_rate)  # compute_mfcc는 MFCC 특징을 추출하는 함수입니다.

# 텐서로 변환하여 모델에 입력하기
input_tensor = torch.tensor(mfcc_features).unsqueeze(0).to(device)  # 배치 차원 추가 및 디바이스로 이동
input_tensor = input_tensor.type(torch.cuda.FloatTensor)  # 텐서 유형 변환 (float32)

# 모델에 입력하여 예측 수행
model.eval()
with torch.no_grad():
    output = model(input_tensor)

# 예측 결과 확인
predicted_class = torch.argmax(output).item()
print("Predicted class:", predicted_class)