In [1]:
import torch
print(torch.cuda.is_available())
print("Done!")


True
Done!


In [2]:
def get_active_device():
    """Picking GPU if available or else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
active_device = get_active_device()
print(active_device)

cuda


In [3]:
# Configuration Parameters
# A random sub sample of the LA data was used for this experiment.
# The LA data was used following most of the academic literture,
# which deals with this dataset.
# The sub-sampling was done due to huge running times.
# The dataset was split as follows:
# Train Set - 80% (25k -> 20k)
# Dev Set   - 40% (25k -> 10k)
# Eval Set  - 14% (70k -> 10k)

config = {
    "train_protocol":"drive/MyDrive/AntiSpoofing/sub_sample/train_protocol.txt",
    "dev_protocol":"drive/MyDrive/AntiSpoofing/sub_sample/dev_protocol.txt",
    "eval_protocol":"drive/MyDrive/AntiSpoofing/sub_sample/eval_protocol.txt",
    "train_audio_folder":"drive/MyDrive/AntiSpoofing/sub_sample/train/",
    "dev_audio_folder":"drive/MyDrive/AntiSpoofing/sub_sample/dev/",
    "eval_audio_folder":"drive/MyDrive/AntiSpoofing/sub_sample/eval/",
    "max_speech_length":64600,
    "batch_size": 32,
    "num_epochs": 15,
    "min_valid_epochs":3,
    "early_stop_max_no_imp":3,
    "optimizer_func":"adadelta",
    "learning_rate":0.05,
    "rho":0.95,
    "beta_one":0,
    "beta_two":0.98,
    "eps":0.00000001,
    "fft_frame_size":360,
    "hop_length":120,
    "sample_rate":16000,
    "lfcc_size":256,
    "filter_size":256,
    "kernels":[3,4,5,6,7],
    "dropout":0.5
}


In [5]:
# Data Loading utilities
import numpy as np
import soundfile as sf
from torch import Tensor
from torch.utils.data import Dataset
import torchaudio

## Adapted from "Hemlata Tak, Jee-weon Jung - tak@eurecom.fr, jeeweon.jung@navercorp.com"

AUDIO_FILE_FIELD = 1
ATTACK_TYPE_FIELD = 3
LABEL_FIELD = 4
LABELS_MAP = {"bonafide":1, "spoof":0}
MAX_SPEECH_LENGTH = 64600

def pad(x, max_len=64600):
    x_len = x.shape[0]
    if x_len >= max_len:
        return x[:max_len]
    # need to pad
    num_repeats = int(max_len / x_len) + 1
    padded_x = np.tile(x, (1, num_repeats))[:, :max_len][0]
    return padded_x


def pad_random(x: np.ndarray, max_len: int = 64600):
    x_len = x.shape[0]
    # if duration is already long enough
    if x_len >= max_len:
        stt = np.random.randint(x_len - max_len)
        return x[stt:stt + max_len]

    # if too short
    num_repeats = int(max_len / x_len) + 1
    padded_x = np.tile(x, (num_repeats))[:max_len]
    return padded_x

class Dataset_ASVspoof2019(Dataset):
    def __init__(self, config: dict, sample_name: str):
        # Read the data set protocol file.
        protocol_file = config[sample_name + "_protocol"]
        audio_files_folder = config[sample_name + "_audio_folder"]
        with open(protocol_file, "r", encoding="utf-8") as f:
            data_set_items = [x.strip().split() for x in f.readlines()]

        self.max_speech_length = config['max_speech_length']
        self.audio_files = [x[AUDIO_FILE_FIELD] for x in data_set_items]
        self.labels = [LABELS_MAP[x[LABEL_FIELD]] for x in data_set_items]
        self.audio_files_folder = audio_files_folder
        self.speckwargs={"n_fft": config['fft_frame_size'], "hop_length": config['hop_length'], "center": False}
        self.transform = torchaudio.transforms.LFCC(sample_rate=config['sample_rate'], n_lfcc=config['lfcc_size'], speckwargs=self.speckwargs)

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, index):
        audio_file = self.audio_files[index]
        signal, _ = sf.read(self.audio_files_folder + audio_file + ".flac")
        padded_signal = pad_random(signal, self.max_speech_length)
        tensor_signal = Tensor(padded_signal)
        lfcc = self.transform(tensor_signal)
        y = self.labels[index]
        return lfcc, y


In [7]:
# Load data example.
from torch.utils.data import DataLoader

ds = Dataset_ASVspoof2019(config, "train")
dl = DataLoader(ds, batch_size=config['batch_size'], shuffle=True, drop_last=False, pin_memory=True)

print("DataSet:")
print("#Items: " + str(len(ds)))
for signal, label in ds:
    break

print("Signal:")
print(signal.shape)
print(signal[:5,:5])
print("Label: " + str(label))

print("\nDataLoader:")
print("#Items: " + str(len(dl)))
for batch, label in dl:
    break

print("batch: " + str(batch.shape))
print("labels: " + str(label[:10]))

DataSet:
#Items: 20288
Signal:
torch.Size([256, 536])
tensor([[-515.9557, -515.7926, -516.3044, -516.5004, -518.4332],
        [   6.9788,    7.2079,    6.4883,    6.2107,    3.4784],
        [   6.9575,    7.1820,    6.4748,    6.1960,    3.4668],
        [   6.9223,    7.1391,    6.4525,    6.1716,    3.4476],
        [   6.8731,    7.0793,    6.4213,    6.1375,    3.4209]])
Label: 1

DataLoader:
#Items: 634
batch: torch.Size([32, 256, 536])
labels: tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])


In [8]:
# The CNN Model.
# This model was inspired by the paper:
#
# Siamese Convolutional Neural Network
# Using Gaussian Probability Feature for
# Spoofing Speech Detection
#
# of Zhenchun Lei, 2020.
#
# In the paper, though, they converted the lfcc vectors
# into GMM vectors, which obtained very good results.
# Due to resources scarcity I content with the lfcc vectors.
#
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNN(nn.Module):
    def __init__(self,
                 config: dict,
                 num_classes):
        super(CNN, self).__init__()

        print('allocate convolution  layers')
        filter_width = config['filter_size']
        kernels = config['kernels']
        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=config['lfcc_size'],
                      out_channels=filter_width,
                      kernel_size=kernel)
            for kernel in kernels
        ])

        self.fc = nn.Linear(filter_width * len(kernels), num_classes)
        self.dropout = nn.Dropout(p=config['dropout'])

    def forward(self, signals):
        # signals = [#batch size, lfcc dim, speech length]

        x_conv_list = [F.relu(conv(signals)) for conv in self.convs]
        # x_conv = [batch size, out_channel_width, speech length - kernel size + 1]
        
        x_pool_list = [F.max_pool1d(x_conv, kernel_size=x_conv.shape[2]) for x_conv in x_conv_list]
        # x_pool = [batch size, out_channel_width, 1]
        
        x_fc = torch.cat([x_pool.squeeze(dim=2) for x_pool in x_pool_list], dim=1)
        # x_fc = [batch size, out_channel_width * #kernels]
        
        logits = self.fc(self.dropout(x_fc))
        # logits = [batch size, #classes]
        
        return logits


In [11]:
# EER computation function

import numpy as np
import torch
import time
from sklearn import metrics
from torch.utils.data import DataLoader

def compute_eer(model: torch.nn.Module, test_set: DataLoader) -> tuple:
    t0 = time.time()
    active_device = (torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu'))
    model.eval()
    scores = []
    targets = []
    for signals, labels in test_set:
        signals = signals.to(active_device)
        labels = labels.to(active_device)
        with torch.no_grad():
            logits = model(signals)
        curr_scores = torch.softmax(logits, dim=1)
        curr_scores = curr_scores.detach().cpu().numpy()
        curr_scores = curr_scores[:,1] - curr_scores[:,0]
        curr_scores = curr_scores.clip(min=0)

        scores.append(curr_scores)
        targets.append(labels.detach().cpu().numpy())

    scores = np.concatenate(scores, axis=0)
    targets = np.concatenate(targets, axis=0)
    fpr, tpr, _ = metrics.roc_curve(targets, scores)
    fnr = 1 - tpr
    eer_index = np.nanargmin(np.absolute(fpr - fnr))
        
    return np.mean((fpr[eer_index], fnr[eer_index]))*100, (time.time() - t0)

In [10]:
import torch.optim as optim

def get_optimizer(parameters, config: dict):
    optimizer = None
    opt_name = config['optimizer_func']
    if opt_name == "adadelta":
        optimizer = optim.Adadelta(parameters,
                                   lr=config['learning_rate'],
                                   rho=config['rho'])
    elif opt_name == 'sgd':
        optimizer = optim.SGD(parameters, config['learning_rate'])
    elif opt_name == "adam":
        optimizer = optim.Adam(parameters,
                               lr=config['learning_rate'],
                               betas=(config['beta_one'],config['beta_two'],),
                               eps=config['eps'])
    else:
        print('Wrong optimizer name: ' + opt_name)
        
    return optimizer


In [13]:
import torch
import numpy as np
import time
import copy
from torch.utils.data import DataLoader
import torch.optim as optim

DEFAULT_MAX_EER = 1000
SEED = 42

class Trainer:
    def __init__(self):
        self.active_device = (torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu'))

    def train(self, config: dict) -> tuple:
        print('CNN trainer - start')

        pending_model = CNN(config, 2)
        pending_model = pending_model.to(self.active_device)
        optimal_model = None

        print("Load samples")
        train_dataset = Dataset_ASVspoof2019(config, "train")
        dev_dataset = Dataset_ASVspoof2019(config, "dev")
        eval_dataset = Dataset_ASVspoof2019(config, "eval")

        train_loader = DataLoader(train_dataset,
                                  batch_size=config['batch_size'],
                                  shuffle=True,
                                  drop_last=True)
        
        dev_loader = DataLoader(dev_dataset,
                                batch_size=config['batch_size'],
                                shuffle=False,
                                drop_last=False)
        
        eval_loader = DataLoader(eval_dataset,
                                 batch_size=config['batch_size'],
                                 shuffle=False,
                                 drop_last=False)

        
        print('set optimizer & loss')
        optimizer = get_optimizer(pending_model.parameters(), config)
        weight = torch.FloatTensor([0.1, 0.9]).to(self.active_device)
        criterion = torch.nn.CrossEntropyLoss(weight=weight)

        best_dev_eer = DEFAULT_MAX_EER
        best_dev_epoch = -1
        best_eval_eer = DEFAULT_MAX_EER
        best_eval_epoch = -1
        
        print('start training loops. #epochs = ' + str(config['num_epochs']))
        print(f"{'Epoch':^7} | {'Train Loss':^12} | {'Train EER':^11} | {'Dev EER':^10} | {'Eval EER':^9} | {'Elapsed':^9}")
        print("-"*50)              
        
        min_loss = 100
        num_no_imp = 0
        for i in range(config['num_epochs']):
            epoch = i + 1
            epoch_start_time = time.time()
            total_loss = 0
            num_batches = 0
            
            pending_model.train()
            for signals, labels in train_loader:
                signals = signals.to(self.active_device)
                labels = labels.to(self.active_device)
                logits = pending_model(signals)
                
                optimizer.zero_grad()
                loss = criterion(logits, labels)
                total_loss += loss.item()
                num_batches += 1
                loss.backward()
                optimizer.step()
                
            avg_loss = total_loss / num_batches
            epoch_time = time.time() - epoch_start_time
            
            # Validation test.
            dev_eer, _ = compute_eer(pending_model, dev_loader)
            train_eer, _ = compute_eer(pending_model, train_loader)
            eval_eer, _ = compute_eer(pending_model, eval_loader)
            print(f"{epoch:^7} | {avg_loss:^12.6f} | {train_eer:^9.2f} | {dev_eer:^9.2f} |  {eval_eer:^9.4f} | {epoch_time:^9.2f}")
                
            if avg_loss < min_loss:
                min_loss = avg_loss
                num_no_imp = 0
            else:
                num_no_imp += 1
                
            if num_no_imp > config["early_stop_max_no_imp"]:
                print('early stop exit')
                break
            
            if epoch < config["min_valid_epochs"]:
                continue
            
            if dev_eer < best_dev_eer:
                best_dev_eer = dev_eer
                best_dev_epoch = epoch
                optimal_model = copy.deepcopy(pending_model)

            if eval_eer < best_eval_eer:
                best_eval_eer = eval_eer
                best_eval_epoch = epoch
        
        print('AASIST trainer - end\n')
        print("Best Dev EER = {:.2f}".format(best_dev_eer) + ", best epoch = " + str(best_dev_epoch))
        print("Best Eval Acc = {:.2f}".format(best_eval_eer) + ", best epoch = " + str(best_eval_epoch))
        return pending_model, optimal_model, best_dev_epoch


In [None]:
trainer = Trainer()
last_epoch_model, best_dev_eer_model, best_dev_eer_epoch = trainer.train(config)

CNN trainer - start
allocate convolution  layers
Load samples
set optimizer & loss
start training loops. #epochs = 15
 Epoch  |  Train Loss  |  Train EER  |  Dev EER   | Eval EER  |  Elapsed 
--------------------------------------------------
