# Libraries

In [3]:
#!pip install spafe

In [6]:
import argparse
from DL.dataset import SoundFeatureDataset 
import pandas as pd
import torch.nn.functional as F
import os
from torch.utils.data import DataLoader
from tqdm import tqdm
import soundfile as sf
import numpy as np
from spafe.utils.preprocessing import pre_emphasis, framing, windowing, zero_handling
from numpy import log, exp, infty, zeros_like, vstack, zeros, errstate, finfo, sqrt, floor, tile, concatenate, arange, \
    meshgrid, ceil, linspace
from scipy.interpolate import interpn
from scipy.special import logsumexp
from scipy.signal import lfilter
from scipy.fft import dct
import librosa
from torch import Tensor
from torchvision import transforms


# Utils

In [7]:
def envelope(y, rate, threshold):
    mask = []
    y = pd.Series(y).apply(np.abs)
    y_mean = y.rolling(window=int(rate / 10), min_periods=1, center=True).mean()
    for mean in y_mean:
        if mean > threshold:
            mask.append(True)
        else:
            mask.append(False)
    return mask


def clean_audio(file_path):
    signal, rate = sf.read(file_path)
    mask = envelope(signal, rate, 0.0005)
    signal = signal[mask]
    signal = signal[: rate]

    return signal, rate


def pad(x, max_len=64000):
    x_len = x.shape[0]
    if x_len >= max_len:
        return x[:max_len]
    num_repeats = (max_len / x_len) + 1
    x_repeat = np.repeat(x, num_repeats)
    padded_x = x_repeat[:max_len]
    return padded_x
def get_log_spectrum(x):
    s = librosa.core.stft(x, n_fft=2048, win_length=2048, hop_length=512)
    a = np.abs(s) ** 2
    feat = librosa.power_to_db(a)
    return feat


# LFCC

In [8]:
from spafe.utils.exceptions import ErrorMsgs

def linear_filter_banks(nfilts=20,
                        nfft=512,
                        fs=16000,
                        low_freq=None,
                        high_freq=None,
                        scale="constant"):
    """
    Compute linear-filterbanks. The filters are stored in the rows, the columns
    correspond to fft bins.

    Args:
        nfilts    (int) : the number of filters in the filterbank.
                          (Default 20)
        nfft      (int) : the FFT size.
                          (Default is 512)
        fs        (int) : sample rate/ sampling frequency of the signal.
                          (Default 16000 Hz)
        low_freq  (int) : lowest band edge of linear filters.
                          (Default 0 Hz)
        high_freq (int) : highest band edge of linear filters.
                          (Default samplerate/2)
        scale    (str)  : choose if max bins amplitudes ascend, descend or are constant (=1).
                          Default is "constant"

    Returns:
        (numpy array) array of size nfilts * (nfft/2 + 1) containing filterbank.
        Each row holds 1 filter.
    """
    # init freqs
    high_freq = high_freq or fs / 2
    low_freq = low_freq or 0

    # run checks


    # compute points evenly spaced in frequency (points are in Hz)
    linear_points = np.linspace(low_freq, high_freq, nfilts + 2)

    # we use fft bins, so we have to convert from Hz to fft bin number
    bins = np.floor((nfft + 1) * linear_points / fs)
    fbank = np.zeros([nfilts, nfft // 2 + 1])

    # init scaler
    if scale == "descendant" or scale == "constant":
        c = 1
    else:
        c = 0

    # compute amps of fbanks
    for j in range(0, nfilts):
        b0, b1, b2 = bins[j], bins[j + 1], bins[j + 2]

        # compute scaler
        if scale == "descendant":
            c -= 1 / nfilts
            c = c * (c > 0) + 0 * (c < 0)

        elif scale == "ascendant":
            c += 1 / nfilts
            c = c * (c < 1) + 1 * (c > 1)

        # compute fbanks
        fbank[j, int(b0):int(b1)] = c * (np.arange(int(b0), int(b1)) -
                                         int(b0)) / (b1 - b0)
        fbank[j, int(b1):int(b2)] = c * (
            int(b2) - np.arange(int(b1), int(b2))) / (b2 - b1)

    return np.abs(fbank)

def lfcc(sig,
         fs=16000,
         num_ceps=20,
         pre_emph=0,
         pre_emph_coeff=0.97,
         win_len=0.030,
         win_hop=0.015,
         win_type="hamming",
         nfilts=70,
         nfft=1024,
         low_freq=None,
         high_freq=None,
         scale="constant",
         dct_type=2,
         normalize=0):
    """
    Compute the linear-frequency cepstral coefficients (GFCC features) from an audio signal.
    Args:
        sig            (array) : a mono audio signal (Nx1) from which to compute features.
        fs               (int) : the sampling frequency of the signal we are working with.
                                 Default is 16000.
        num_ceps       (float) : number of cepstra to return.
                                 Default is 13.
        pre_emph         (int) : apply pre-emphasis if 1.
                                 Default is 1.
        pre_emph_coeff (float) : apply pre-emphasis filter [1 -pre_emph] (0 = none).
                                 Default is 0.97.
        win_len        (float) : window length in sec.
                                 Default is 0.025.
        win_hop        (float) : step between successive windows in sec.
                                 Default is 0.01.
        win_type       (float) : window type to apply for the windowing.
                                 Default is "hamming".
        nfilts           (int) : the number of filters in the filterbank.
                                 Default is 40.
        nfft             (int) : number of FFT points.
                                 Default is 512.
        low_freq         (int) : lowest band edge of mel filters (Hz).
                                 Default is 0.
        high_freq        (int) : highest band edge of mel filters (Hz).
                                 Default is samplerate / 2 = 8000.
        scale           (str)  : choose if max bins amplitudes ascend, descend or are constant (=1).
                                 Default is "constant".
        dct_type         (int) : type of DCT used - 1 or 2 (or 3 for HTK or 4 for feac).
                                 Default is 2.
        use_energy       (int) : overwrite C0 with true log energy
                                 Default is 0.
        lifter           (int) : apply liftering if value > 0.
                                 Default is 22.
        normalize        (int) : apply normalization if 1.
                                 Default is 0.
    Returns:
        (array) : 2d array of LFCC features (num_frames x num_ceps)
    """
    # init freqs
    high_freq = high_freq or fs / 2
    low_freq = low_freq or 0

    # run checks
    """
    if low_freq < 0:
        raise ParameterError(ErrorMsgs["low_freq"])
    if high_freq > (fs / 2):
        raise ParameterError(ErrorMsgs["high_freq"])
    if nfilts < num_ceps:
        raise ParameterError(ErrorMsgs["nfilts"])
"""
    # pre-emphasis
    if pre_emph:
        sig = pre_emphasis(sig=sig, pre_emph_coeff=pre_emph_coeff)

    # -> framing
    frames, frame_length = framing(sig=sig,
                                   fs=fs,
                                   win_len=win_len,
                                   win_hop=win_hop)

    # -> windowing
    windows = windowing(frames=frames,
                        frame_len=frame_length,
                        win_type=win_type)

    # -> FFT -> |.|
    fourrier_transform = np.fft.rfft(windows, nfft)
    abs_fft_values = np.abs(fourrier_transform) ** 2

    #  -> x linear-fbanks
    linear_fbanks_mat = linear_filter_banks(nfilts=nfilts,
                                            nfft=nfft,
                                            fs=fs,
                                            low_freq=low_freq,
                                            high_freq=high_freq,
                                            scale=scale)
    features = np.dot(abs_fft_values, linear_fbanks_mat.T)

    log_features = np.log10(features + 2.2204e-16)

    #  -> DCT(.)
    lfccs = dct(log_features, type=dct_type, norm='ortho', axis=1)[:, :num_ceps]

    return lfccs





def Deltas(x, width=3):
    hlen = int(floor(width/2))
    win = list(range(hlen, -hlen-1, -1))
    xx_1 = tile(x[:, 0], (1, hlen)).reshape(hlen, -1).T
    xx_2 = tile(x[:, -1], (1, hlen)).reshape(hlen, -1).T
    xx = concatenate([xx_1, x, xx_2], axis=-1)
    D = lfilter(win, 1, xx)
    return D[:, hlen*2:]


def extract_lfcc(sig,fs=16000, num_ceps=20, order_deltas=2, low_freq=0, high_freq=4000):
    # put VAD here, if wanted
    lfccs = lfcc(sig=sig,
                 fs=fs,
                 num_ceps=num_ceps,
                 low_freq=low_freq,
                 high_freq=high_freq).T
    if order_deltas > 0:
        feats = list()
        feats.append(lfccs)
        for d in range(order_deltas):
            feats.append(Deltas(feats[-1]))
        lfccs = vstack(feats)
    return lfccs

# Dataset

In [10]:
train_set = SoundFeatureDataset('../Data/PA','../DL/mfcc',is_logical=False, is_train=True)
dev_set = SoundFeatureDataset('../Data/PA','../DL/mfcc',is_logical=False, is_train=False)

In [5]:
# import torch
# import collections
# import os
# import soundfile as sf
# import librosa
# from torch.utils.data import DataLoader, Dataset
# import numpy as np
# from joblib import Parallel, delayed
# import h5py
# import random
# 
# 
# ASVFile = collections.namedtuple('ASVFile',
#                                  ['speaker_id', 'file_name', 'path', 'sys_id', 'key'])
# 
# class ASVDataset(Dataset):
#     def __init__(self,data_root,transform=None, is_train=True, sample_size=None,
#                  is_logical=True, feature_name=None, is_eval=False,cache_name=None,random_sample=None,
#                  eval_part=0):
# 
# 
#         if is_logical:
#             track = 'LA'
#         else:
# 
#             track = 'PA'
#         assert feature_name is not None, 'must provide feature name'
#         self.track = track
#         self.is_logical = is_logical
#         self.prefix = 'ASVspoof2019_{}'.format(track)
#         v1_suffix = ''
#         if is_eval and track == 'PA':
#             v1_suffix = '_v1'
# 
#         self.is_eval = is_eval
#         self.data_root = data_root
#         self.dset_name = 'eval' if is_eval else 'train' if is_train else 'dev'
#         self.protocols_fname = 'eval.trl'.format(eval_part) if is_eval else 'train.trn' if is_train else 'dev.trl'
#         self.protocols_dir = os.path.join(self.data_root,
#                                           '{}_cm_protocols/'.format(self.prefix))
#         self.files_dir = os.path.join(self.data_root, '{}_{}'.format(
#             self.prefix, self.dset_name) + v1_suffix, 'flac')
#         self.protocols_fname = os.path.join(self.protocols_dir,
#                                             'ASVspoof2019.{}.cm.{}.txt'.format(track, self.protocols_fname))
#         self.cache_fname = 'cache_{}{}_{}_{}.npy'.format(self.dset_name,
#                                                          '_part{}'.format(eval_part) if is_eval else '', track,
#                                                          feature_name)
#         self.cache_matlab_fname = 'cache_{}{}_{}_{}.mat'.format(
#             self.dset_name, '_part{}'.format(eval_part) if is_eval else '',
#             track, feature_name)
#         self.transform = transform
#         self.random_length=random_sample
#         print(transform)
#         if os.path.exists(self.cache_fname):
#             self.data_x, self.data_y,self.data_filename = torch.load(self.cache_fname)
#             print('Dataset loaded from cache ', self.cache_fname)
#         elif feature_name == 'cqcc':
#             if os.path.exists(self.cache_matlab_fname):
#                 self.data_x, self.data_y, self.data_filename = self.read_matlab_cache(self.cache_matlab_fname)
#                 self.files_meta = self.parse_protocols_file(self.protocols_fname)
#                 print('Dataset loaded from matlab cache ', self.cache_matlab_fname)
#                 torch.save((self.data_x, self.data_y, self.data_filename, self.files_meta),
#                            self.cache_fname, pickle_protocol=4)
#                 print('Dataset saved to cache ', self.cache_fname)
#             else:
#                 print("Matlab cache for cqcc feature do not exist.")
#         else:
#             self.files_meta = self.parse_protocols_file(self.protocols_fname)
#             if self.random_length:
#                 random_files_meta = random.sample(self.files_meta, min(len(self.files_meta), self.random_length))
#                 data = list(map(self.read_file, random_files_meta))
#             else:
#                 data = list(map(self.read_file, self.files_meta))
#             self.data_x, self.data_y, self.data_filename = map(list, zip(*data))
#             if self.transform is not None:
#                 # self.data_x = list(map(self.transform, self.data_x))
#                 self.data_x = Parallel(n_jobs=4, prefer='threads')(delayed(self.transform)(x) for x in self.data_x)
#             torch.save((self.data_x, self.data_y,self.data_filename), self.cache_fname)
#             print('Dataset saved to cache ', self.cache_fname)
#         if sample_size:
#             select_idx = np.random.choice(len(self.files_meta), size=(sample_size,), replace=True).astype(np.int32)
#             self.files_meta = [self.files_meta[x] for x in select_idx]
#             self.data_x = [self.data_x[x] for x in select_idx]
#             self.data_y = [self.data_y[x] for x in select_idx]
#             self.data_sysid = [self.data_filename[x] for x in select_idx]
#         self.length = len(self.data_x)
# 
#     def __len__(self):
#         return self.length
# 
#     def __getitem__(self, idx):
#         x = self.data_x[idx]
#         y = self.data_y[idx]
#         return x, y,self.data_filename[idx]
# 
#     def read_file(self, meta):
#         data_x, sample_rate = sf.read(meta.path)
#         data_y = meta.key
#         return data_x, float(data_y), meta.file_name
# 
#     def _parse_line(self, line):
#         tokens = line.strip().split(' ')
#         return ASVFile(speaker_id=tokens[0],
#                        file_name=tokens[1],
#                        path=os.path.join(self.files_dir, tokens[1] + '.flac'),
#                        sys_id=0,
#                        key=int(tokens[4] == 'spoof'))
# 
#     def parse_protocols_file(self, protocols_fname):
#         lines = open(protocols_fname).readlines()
#         files_meta = map(self._parse_line, lines)
#         return list(files_meta)
# 
#     def read_matlab_cache(self, filepath):
#         f = h5py.File(filepath, 'r')
#         # filename_index = f["filename"]
#         # filename = []
#         data_x_index = f["data_x"]
#         sys_id_index = f["sys_id"]
#         data_x = []
#         data_y = f["data_y"][0]
#         sys_id = []
#         for i in range(0, data_x_index.shape[1]):
#             idx = data_x_index[0][i]  # data_x
#             temp = f[idx]
#             data_x.append(np.array(temp).transpose())
#             # idx = filename_index[0][i]  # filename
#             # temp = list(f[idx])
#             # temp_name = [chr(x[0]) for x in temp]
#             # filename.append(''.join(temp_name))
#             idx = sys_id_index[0][i]  # sys_id
#             temp = f[idx]
#             sys_id.append(int(list(temp)[0][0]))
#         data_x = np.array(data_x)
#         data_y = np.array(data_y)
#         return data_x.astype(np.float32), data_y.astype(np.int64), sys_id

# LOSS

In [11]:
import torch
import torch.nn as nn
from torch.autograd.function import Function
import torch.nn.functional as F
from torch.autograd import Variable

class OCSoftmax(nn.Module):
    def __init__(self, feat_dim=2, r_real=0.9, r_fake=0.5, alpha=20.0):
        super(OCSoftmax, self).__init__()
        self.feat_dim = feat_dim
        self.r_real = r_real
        self.r_fake = r_fake
        self.alpha = alpha
        self.center = nn.Parameter(torch.randn(1, self.feat_dim))
        nn.init.kaiming_uniform_(self.center, 0.25)
        self.softplus = nn.Softplus()

    def forward(self, x, labels):
        """
        Args:
            x: feature matrix with shape (batch_size, feat_dim).
            labels: ground truth labels with shape (batch_size).
        """
        w = F.normalize(self.center, p=2, dim=1)
        x = F.normalize(x, p=2, dim=1)

        scores = x @ w.transpose(0,1)
        output_scores = scores.clone()

        scores[labels == 0] = self.r_real - scores[labels == 0]
        scores[labels == 1] = scores[labels == 1] - self.r_fake

        loss = self.softplus(self.alpha * scores).mean()

        return loss, output_scores.squeeze(1)

class AMSoftmax(nn.Module):
    def __init__(self, num_classes, enc_dim, s=20, m=0.9):
        super(AMSoftmax, self).__init__()
        self.enc_dim = enc_dim
        self.num_classes = num_classes
        self.s = s
        self.m = m
        self.centers = nn.Parameter(torch.randn(num_classes, enc_dim))

    def forward(self, feat, label):
        batch_size = feat.shape[0]
        norms = torch.norm(feat, p=2, dim=-1, keepdim=True)
        nfeat = torch.div(feat, norms)

        norms_c = torch.norm(self.centers, p=2, dim=-1, keepdim=True)
        ncenters = torch.div(self.centers, norms_c)
        logits = torch.matmul(nfeat, torch.transpose(ncenters, 0, 1))

        y_onehot = torch.FloatTensor(batch_size, self.num_classes)
        y_onehot.zero_()
        y_onehot = Variable(y_onehot).cuda()
        y_onehot.scatter_(1, torch.unsqueeze(label, dim=-1), self.m)
        margin_logits = self.s * (logits - y_onehot)

        return logits, margin_logits

# Model

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init
import os
import random
import numpy as np

class ConvLayer(nn.Module):
    def __init__(self, n_in, n_out, ks=1, ndim=1, norm_type=None, act_cls=None, bias=False):
        super(ConvLayer, self).__init__()
        self.conv = nn.Conv1d(n_in, n_out, kernel_size=ks, bias=bias)
        self.norm = None
        if norm_type:
            if norm_type == NormType.Batch:
                self.norm = nn.BatchNorm1d(n_out)
            elif norm_type == NormType.Group:
                self.norm = nn.GroupNorm(1, n_out)
            elif norm_type == NormType.Layer:
                self.norm = nn.LayerNorm(n_out)
            elif norm_type == NormType.Spectral:
                self.norm = nn.utils.spectral_norm
        self.activation = act_cls() if act_cls else None

    def forward(self, x):
        x = self.conv(x)
        if self.norm:
            x = self.norm(x)
        if self.activation:
            x = self.activation(x)
        return x

class SelfAttention(nn.Module):
    "Self attention layer for n_channels."
    def __init__(self, n_channels):
        super(SelfAttention, self).__init__()  # Call the superclass's __init__() method
        self.query, self.key, self.value = [self._conv(n_channels, c) for c in (n_channels//8, n_channels//8, n_channels)]
        self.gamma = nn.Parameter(torch.tensor([0.]))

    def _conv(self, n_in, n_out):
        return ConvLayer(n_in, n_out, ks=1, ndim=1, norm_type=None, act_cls=None, bias=False)

    def forward(self, x):
        size = x.size()
        x = x.view(*size[:2], -1)
        f, g, h = self.query(x), self.key(x), self.value(x)
        beta = F.softmax(torch.bmm(f.transpose(1, 2), g), dim=1)
        o = self.gamma * torch.bmm(h, beta) + x
        return o.view(*size).contiguous()


class PreActBlock(nn.Module):
    '''Pre-activation version of the BasicBlock.'''
    expansion = 1

    def __init__(self, in_planes, planes, stride, *args, **kwargs):
        super(PreActBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)

        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False))

    def forward(self, x):
        out = F.relu(self.bn1(x))
        shortcut = self.shortcut(out) if hasattr(self, 'shortcut') else x
        out = self.conv1(out)
        out = self.conv2(F.relu(self.bn2(out)))
        out += shortcut
        return out


class PreActBottleneck(nn.Module):
    '''Pre-activation version of the original Bottleneck module.'''
    expansion = 4

    def __init__(self, in_planes, planes, stride, *args, **kwargs):
        super(PreActBottleneck, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)

        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False))

    def forward(self, x):
        out = F.relu(self.bn1(x))
        shortcut = self.shortcut(out) if hasattr(self, 'shortcut') else x
        out = self.conv1(out)
        out = self.conv2(F.relu(self.bn2(out)))
        out = self.conv3(F.relu(self.bn3(out)))
        out += shortcut
        return out

def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)

def conv1x1(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

RESNET_CONFIGS = {'18': [[2, 2, 2, 2], PreActBlock],
                  '28': [[3, 4, 6, 3], PreActBlock],
                  '34': [[3, 4, 6, 3], PreActBlock],
                  '50': [[3, 4, 6, 3], PreActBottleneck],
                  '101': [[3, 4, 23, 3], PreActBottleneck]
                  }

def setup_seed(random_seed, cudnn_deterministic=True):
    # initialization
    torch.manual_seed(random_seed)
    random.seed(random_seed)
    np.random.seed(random_seed)
    os.environ['PYTHONHASHSEED'] = str(random_seed)

    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(random_seed)
        torch.backends.cudnn.deterministic = cudnn_deterministic
        torch.backends.cudnn.benchmark = False

class ResNet(nn.Module):
    def __init__(self, num_nodes, enc_dim, resnet_type='18', nclasses=2):
        self.in_planes = 16
        super(ResNet, self).__init__()

        layers, block = RESNET_CONFIGS[resnet_type]

        self._norm_layer = nn.BatchNorm2d

        self.conv1 = nn.Conv2d(1, 16, kernel_size=(9, 3), stride=(3, 1), padding=(1, 1), bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.activation = nn.ReLU()

        self.layer1 = self._make_layer(block, 64, layers[0], stride=1)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.conv5 = nn.Conv2d(512 * block.expansion, 256, kernel_size=(num_nodes, 3), stride=(1, 1), padding=(0, 1),
                               bias=False)
        self.bn5 = nn.BatchNorm2d(256)
        self.fl=nn.Flatten()
        self.fc = nn.Linear(256 * 2, enc_dim)
        self.fc_mu = nn.Linear(enc_dim, nclasses) if nclasses >= 2 else nn.Linear(enc_dim, 1)

        self.initialize_params()
        self.attention = SelfAttention(256)
        self.fc1=nn.LazyLinear(512)

    def initialize_params(self):
        for layer in self.modules():
            if isinstance(layer, torch.nn.Conv2d):
                init.kaiming_normal_(layer.weight, a=0, mode='fan_out')
            elif isinstance(layer, torch.nn.Linear):
                init.kaiming_uniform_(layer.weight)
            elif isinstance(layer, torch.nn.BatchNorm2d) or isinstance(layer, torch.nn.BatchNorm1d):
                layer.weight.data.fill_(1)
                layer.bias.data.zero_()

    def _make_layer(self, block, planes, num_blocks, stride=1):
        norm_layer = self._norm_layer
        downsample = None
        if stride != 1 or self.in_planes != planes * block.expansion:
            downsample = nn.Sequential(conv1x1(self.in_planes, planes * block.expansion, stride),
                                       norm_layer(planes * block.expansion))
        layers = []
        layers.append(block(self.in_planes, planes, stride, downsample, 1, 64, 1, norm_layer))
        self.in_planes = planes * block.expansion
        for _ in range(1, num_blocks):
            layers.append(
                block(self.in_planes, planes, 1, groups=1, base_width=64, dilation=False, norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def forward(self, x):

        x = self.conv1(x)
        x = self.activation(self.bn1(x))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.conv5(x)
        x_shape=x.shape
        x = self.activation(self.bn5(x)).view(x_shape[0],x_shape[1], -1)
        stats = self.attention(x)
        stats=self.fl(stats)
        stats=self.fc1(stats)
        feat = self.fc(stats)

        mu = self.fc_mu(feat)
    
        return feat, mu


In [8]:
#OLD VERSION
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init
import os
import random
import numpy as np

class SelfAttention(nn.Module):
    def __init__(self, hidden_size, mean_only=False):
        super(SelfAttention, self).__init__()

        #self.output_size = output_size
        self.hidden_size = hidden_size
        self.att_weights = nn.Parameter(torch.Tensor(1, hidden_size),requires_grad=True)

        self.mean_only = mean_only

        init.kaiming_uniform_(self.att_weights)

    def forward(self, inputs):

        batch_size = inputs.size(0)
        weights = torch.bmm(inputs, self.att_weights.permute(1, 0).unsqueeze(0).repeat(batch_size, 1, 1))

        if inputs.size(0)==1:
            attentions = F.softmax(torch.tanh(weights),dim=1)
            weighted = torch.mul(inputs, attentions.expand_as(inputs))
        else:
            attentions = F.softmax(torch.tanh(weights.squeeze()),dim=1)
            weighted = torch.mul(inputs, attentions.unsqueeze(2).expand_as(inputs))

        if self.mean_only:
            return weighted.sum(1)
        else:
            noise = 1e-5*torch.randn(weighted.size())

            if inputs.is_cuda:
                noise = noise.to(inputs.device)
            avg_repr, std_repr = weighted.sum(1), (weighted+noise).std(1)

            representations = torch.cat((avg_repr,std_repr),1)

            return representations


class PreActBlock(nn.Module):
    '''Pre-activation version of the BasicBlock.'''
    expansion = 1

    def __init__(self, in_planes, planes, stride, *args, **kwargs):
        super(PreActBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)

        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False))

    def forward(self, x):
        out = F.relu(self.bn1(x))
        shortcut = self.shortcut(out) if hasattr(self, 'shortcut') else x
        out = self.conv1(out)
        out = self.conv2(F.relu(self.bn2(out)))
        out += shortcut
        return out


class PreActBottleneck(nn.Module):
    '''Pre-activation version of the original Bottleneck module.'''
    expansion = 4

    def __init__(self, in_planes, planes, stride, *args, **kwargs):
        super(PreActBottleneck, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)

        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False))

    def forward(self, x):
        out = F.relu(self.bn1(x))
        shortcut = self.shortcut(out) if hasattr(self, 'shortcut') else x
        out = self.conv1(out)
        out = self.conv2(F.relu(self.bn2(out)))
        out = self.conv3(F.relu(self.bn3(out)))
        out += shortcut
        return out

def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)

def conv1x1(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

RESNET_CONFIGS = {'18': [[2, 2, 2, 2], PreActBlock],
                  '28': [[3, 4, 6, 3], PreActBlock],
                  '34': [[3, 4, 6, 3], PreActBlock],
                  '50': [[3, 4, 6, 3], PreActBottleneck],
                  '101': [[3, 4, 23, 3], PreActBottleneck]
                  }

def setup_seed(random_seed, cudnn_deterministic=True):
    # initialization
    torch.manual_seed(random_seed)
    random.seed(random_seed)
    np.random.seed(random_seed)
    os.environ['PYTHONHASHSEED'] = str(random_seed)

    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(random_seed)
        torch.backends.cudnn.deterministic = cudnn_deterministic
        torch.backends.cudnn.benchmark = False

class ResNet(nn.Module):
    def __init__(self, num_nodes, enc_dim, resnet_type='18', nclasses=2):
        self.in_planes = 16
        super(ResNet, self).__init__()

        layers, block = RESNET_CONFIGS[resnet_type]

        self._norm_layer = nn.BatchNorm2d

        self.conv1 = nn.Conv2d(1, 16, kernel_size=(9, 3), stride=(3, 1), padding=(1, 1), bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.activation = nn.ReLU()

        self.layer1 = self._make_layer(block, 64, layers[0], stride=1)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.conv5 = nn.Conv2d(512 * block.expansion, 256, kernel_size=(num_nodes, 3), stride=(1, 1), padding=(0, 1),
                               bias=False)
        self.bn5 = nn.BatchNorm2d(256)
        self.fc = nn.Linear(256 * 2, enc_dim)
        self.fc_mu = nn.Linear(enc_dim, nclasses) if nclasses >= 2 else nn.Linear(enc_dim, 1)

        self.initialize_params()
        self.attention = SelfAttention(256)

    def initialize_params(self):
        for layer in self.modules():
            if isinstance(layer, torch.nn.Conv2d):
                init.kaiming_normal_(layer.weight, a=0, mode='fan_out')
            elif isinstance(layer, torch.nn.Linear):
                init.kaiming_uniform_(layer.weight)
            elif isinstance(layer, torch.nn.BatchNorm2d) or isinstance(layer, torch.nn.BatchNorm1d):
                layer.weight.data.fill_(1)
                layer.bias.data.zero_()

    def _make_layer(self, block, planes, num_blocks, stride=1):
        norm_layer = self._norm_layer
        downsample = None
        if stride != 1 or self.in_planes != planes * block.expansion:
            downsample = nn.Sequential(conv1x1(self.in_planes, planes * block.expansion, stride),
                                       norm_layer(planes * block.expansion))
        layers = []
        layers.append(block(self.in_planes, planes, stride, downsample, 1, 64, 1, norm_layer))
        self.in_planes = planes * block.expansion
        for _ in range(1, num_blocks):
            layers.append(
                block(self.in_planes, planes, 1, groups=1, base_width=64, dilation=False, norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def forward(self, x):

        x = self.conv1(x)
        x = self.activation(self.bn1(x))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.conv5(x)
        x_shape=x.shape
        x = self.activation(self.bn5(x)).view(x_shape[0],x_shape[1], -1)
        stats = self.attention(x.permute(0, 2, 1).contiguous())
        feat = self.fc(stats)

        mu = self.fc_mu(feat)

        return feat, mu

In [13]:
def obtain_asv_error_rates(tar_asv, non_asv, spoof_asv, asv_threshold):

    # False alarm and miss rates for ASV
    Pfa_asv = sum(non_asv >= asv_threshold) / non_asv.size
    Pmiss_asv = sum(tar_asv < asv_threshold) / tar_asv.size

    # Rate of rejecting spoofs in ASV
    if spoof_asv.size == 0:
        Pmiss_spoof_asv = None
    else:
        Pmiss_spoof_asv = np.sum(spoof_asv < asv_threshold) / spoof_asv.size

    return Pfa_asv, Pmiss_asv, Pmiss_spoof_asv


def compute_det_curve(target_scores, nontarget_scores):

    n_scores = target_scores.size + nontarget_scores.size
    all_scores = np.concatenate((target_scores, nontarget_scores))
    labels = np.concatenate((np.ones(target_scores.size), np.zeros(nontarget_scores.size)))

    # Sort labels based on scores
    indices = np.argsort(all_scores, kind='mergesort')
    labels = labels[indices]

    # Compute false rejection and false acceptance rates
    tar_trial_sums = np.cumsum(labels)
    nontarget_trial_sums = nontarget_scores.size - (np.arange(1, n_scores + 1) - tar_trial_sums)

    frr = np.concatenate((np.atleast_1d(0), tar_trial_sums / target_scores.size))  # false rejection rates
    far = np.concatenate((np.atleast_1d(1), nontarget_trial_sums / nontarget_scores.size))  # false acceptance rates
    thresholds = np.concatenate((np.atleast_1d(all_scores[indices[0]] - 0.001), all_scores[indices]))  # Thresholds are the sorted scores

    return frr, far, thresholds


def compute_eer(target_scores, nontarget_scores):
    """ Returns equal error rate (EER) and the corresponding threshold. """
    frr, far, thresholds = compute_det_curve(target_scores, nontarget_scores)
    abs_diffs = np.abs(frr - far)
    min_index = np.argmin(abs_diffs)
    eer = np.mean((frr[min_index], far[min_index]))
    return eer, thresholds[min_index]

In [14]:
def compute_mfcc_feats(x):
    mfcc = librosa.feature.mfcc(y=x, sr=16000, n_mfcc=24)
    delta = librosa.feature.delta(mfcc)
    delta2 = librosa.feature.delta(delta)
    feats = np.concatenate((mfcc, delta, delta2), axis=0)
    return feats

In [15]:
def get_log_spectrum(x):
    s = librosa.core.stft(x, n_fft=2048, win_length=2048, hop_length=512)
    a = np.abs(s)**2
    #melspect = librosa.feature.melspectrogram(S=a)
    feat = librosa.power_to_db(a)
    return feat

In [18]:
import os
import json
import shutil
from collections import defaultdict
from tqdm import tqdm
import numpy as np
import torch
from torch import nn
from torchvision import transforms

def initParams():
    args = {
        'num_epochs': 50,
        'batch_size': 64,
        'lr': 0.0003,
        'lr_decay': 0.5,
        'interval': 10,
        'beta_1': 0.9,
        'beta_2': 0.999,
        'eps': 1e-8,
        'gpu': "1",
        'num_workers': 0,
        'seed': 598,
        'add_loss': "ocsoftmax",
        'weight_loss': 1,
        'r_real': 0.9,
        'r_fake': 0.2,
        'alpha': 20,
        'continue_training': False,
        'out_fold':'../DL/models/resnet',
        'enc_dim':256
    }


    # Change this to specify GPU

    if args['continue_training']:
        assert os.path.exists(args['out_fold'])
    else:
        if not os.path.exists(args['out_fold']):
            os.makedirs(args['out_fold'])
        else:
            shutil.rmtree(args['out_fold'])
            os.mkdir(args['out_fold'])

        if not os.path.exists(os.path.join(args['out_fold'], 'checkpoint')):
            os.makedirs(os.path.join(args['out_fold'], 'checkpoint'))
        else:
            shutil.rmtree(os.path.join(args['out_fold'], 'checkpoint'))
            os.mkdir(os.path.join(args['out_fold'], 'checkpoint'))


        with open(os.path.join(args['out_fold'], 'train_loss.log'), 'w') as file:
            file.write("Start recording training loss ...\n")
        with open(os.path.join(args['out_fold'], 'dev_loss.log'), 'w') as file:
            file.write("Start recording validation loss ...\n")

    args['cuda'] = torch.cuda.is_available()  
    print(args['cuda'])
    print(torch.cuda.current_device())
    args['device'] = torch.device("cuda" if args['cuda'] else "cpu")
    
    return args


def adjust_learning_rate(args, optimizer, epoch_num):
    lr = args['lr'] * (args['lr_decay'] ** (epoch_num // args['interval']))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

def train(args,transforms,dev_set,train_set):
    lfcc_model = ResNet(3, args['enc_dim'], resnet_type='18', nclasses=2).to(args['device'])
    if args['continue_training']:
        lfcc_model = torch.load(os.path.join(args['out_fold'], 'anti-spoofing_lfcc_model.pt')).to(args['device'])

    lfcc_optimizer = torch.optim.Adam(lfcc_model.parameters(), lr=args['lr'],
                                      betas=(args['beta_1'], args['beta_2']), eps=args['eps'], weight_decay=0.0005)
 
    
    
    trainDataLoader = DataLoader(train_set, batch_size=args['batch_size'], shuffle=True)
 
    valDataLoader = DataLoader(dev_set, batch_size=args['batch_size'], shuffle=True)

    criterion = nn.CrossEntropyLoss()

    if args['add_loss'] == "amsoftmax":
        amsoftmax_loss = AMSoftmax(2, args['enc_dim'], s=args['alpha'], m=args['r_real']).to(args['device'])
        amsoftmax_loss.train()
        amsoftmax_optimzer = torch.optim.SGD(amsoftmax_loss.parameters(), lr=0.01)

    if args['add_loss'] == "ocsoftmax":
        ocsoftmax = OCSoftmax(args['enc_dim'], r_real=args['r_real'], r_fake=args['r_fake'], alpha=args['alpha']).to(args['device'])
        ocsoftmax.train()
        ocsoftmax_optimzer = torch.optim.SGD(ocsoftmax.parameters(), lr=args['lr'])

    early_stop_cnt = 0
    prev_eer = 1e8

    monitor_loss = args['add_loss']

    for epoch_num in range(args['num_epochs']):
        lfcc_model.train()
        print(lfcc_model.device)
        trainlossDict = defaultdict(list)
        devlossDict = defaultdict(list)
        adjust_learning_rate(args, lfcc_optimizer, epoch_num)
        if args['add_loss'] == "ocsoftmax":
            adjust_learning_rate(args, ocsoftmax_optimzer, epoch_num)
        elif args['add_loss'] == "amsoftmax":
            adjust_learning_rate(args, amsoftmax_optimzer, epoch_num)
        print('\nEpoch: %d ' % (epoch_num + 1))
        for i, (lfcc, labels) in enumerate(tqdm(trainDataLoader)):
            lfcc = lfcc.unsqueeze(1).float().to(args['device'])
            labels = labels.type(torch.LongTensor)  
            labels = labels.to(args['device'])
            feats, lfcc_outputs = lfcc_model(lfcc)
            lfcc_loss = criterion(lfcc_outputs, labels)

            if args['add_loss'] == "softmax":
                lfcc_optimizer.zero_grad()
                trainlossDict[args['add_loss']].append(lfcc_loss.item())
                lfcc_loss.backward()
                lfcc_optimizer.step()

            if args['add_loss'] == "ocsoftmax":
                ocsoftmaxloss, _ = ocsoftmax(feats, labels)
                lfcc_loss = ocsoftmaxloss * args['weight_loss']
                lfcc_optimizer.zero_grad()
                ocsoftmax_optimzer.zero_grad()
                trainlossDict[args['add_loss']].append(ocsoftmaxloss.item())
                lfcc_loss.backward()
                lfcc_optimizer.step()
                ocsoftmax_optimzer.step()

            if args['add_loss'] == "amsoftmax":
                outputs, moutputs = amsoftmax_loss(feats, labels)
                lfcc_loss = criterion(moutputs, labels)
                trainlossDict[args['add_loss']].append(lfcc_loss.item())
                lfcc_optimizer.zero_grad()
                amsoftmax_optimzer.zero_grad()
                lfcc_loss.backward()
                lfcc_optimizer.step()
                amsoftmax_optimzer.step()

            # with open(os.path.join(args['out_fold'], "train_loss.log"), "a") as log:
            #     log.write(str(epoch_num) + "\t" + str(i) + "\t" +
            #               str(np.nanmean(trainlossDict[monitor_loss])) + "\n")

        lfcc_model.eval()
        with torch.no_grad():
            idx_loader, score_loader = [], []
            for i, (lfcc, labels) in enumerate(valDataLoader):
                lfcc = lfcc.unsqueeze(1).float().to(args['device'])
                labels = labels.type(torch.LongTensor) 
                labels = labels.to(args['device'])
                feats, lfcc_outputs = lfcc_model(lfcc)

                lfcc_loss = criterion(lfcc_outputs.float(), labels)
                score = F.softmax(lfcc_outputs, dim=1)[:, 0]

                if args['add_loss'] == "softmax":
                    devlossDict["softmax"].append(lfcc_loss.item())
                elif args['add_loss'] == "amsoftmax":
                    outputs, moutputs = amsoftmax_loss(feats, labels)
                    lfcc_loss = criterion(moutputs, labels)
                    score = F.softmax(outputs, dim=1)[:, 0]
                    devlossDict[args['add_loss']].append(lfcc_loss.item())
                elif args['add_loss'] == "ocsoftmax":
                    ocsoftmaxloss, score = ocsoftmax(feats, labels)
                    devlossDict[args['add_loss']].append(ocsoftmaxloss.item())
                idx_loader.append(labels)
                score_loader.append(score)

            scores = torch.cat(score_loader, 0).data.cpu().numpy()
            labels = torch.cat(idx_loader, 0).data.cpu().numpy()
            val_eer = compute_eer(scores[labels == 0], scores[labels == 1])[0]

            with open(os.path.join(args['out_fold'], "dev_loss.log"), "a") as log:
                log.write(
                    str(epoch_num) + "\t" + str(np.nanmean(devlossDict[monitor_loss])) + "\t" + str(val_eer) + "\n")
            print("Val EER: {}".format(val_eer))

        torch.save(lfcc_model, os.path.join(args['out_fold'], 'checkpoint',
                                            'anti-spoofing_lfcc_model_%d.pt' % (epoch_num + 1)))
        if args['add_loss'] == "ocsoftmax":
            loss_model = ocsoftmax
            torch.save(loss_model, os.path.join(args['out_fold'], 'checkpoint',
                                                'anti-spoofing_loss_model_%d.pt' % (epoch_num + 1)))
        elif args['add_loss'] == "amsoftmax":
            loss_model = amsoftmax_loss
            torch.save(loss_model, os.path.join(args['out_fold'], 'checkpoint',
                                                'anti-spoofing_loss_model_%d.pt' % (epoch_num + 1)))
        else:
            loss_model = None

        if val_eer < prev_eer:
            torch.save(lfcc_model, os.path.join(args['out_fold'], 'anti-spoofing_lfcc_model.pt'))
            if args['add_loss'] == "ocsoftmax":
                loss_model = ocsoftmax
                torch.save(loss_model, os.path.join(args['out_fold'], 'anti-spoofing_loss_model.pt'))
            elif args['add_loss'] == "amsoftmax":
                loss_model = amsoftmax_loss
                torch.save(loss_model, os.path.join(args['out_fold'], 'anti-spoofing_loss_model.pt'))
            else:
                loss_model = None
            prev_eer = val_eer
            early_stop_cnt = 0
        else:
            early_stop_cnt += 1

        if early_stop_cnt == 100:
            with open(os.path.join(args['out_fold'], 'args.json'), 'a') as res_file:
                res_file.write('\nTrained Epochs: %d\n' % (epoch_num - 19))
            break

    return lfcc_model, loss_model

In [17]:
feature_fn = compute_mfcc_feats
transforms = transforms.Compose([
lambda x: pad(x),
lambda x: librosa.util.normalize(x),
lambda x: feature_fn(x),
lambda x: Tensor(x)
])

In [None]:
args = initParams()
model=train(args,transforms,dev_set,train_set)

True
0





Epoch: 1 


 46%|████▌     | 387/844 [03:36<03:49,  1.99it/s]

# Test

In [38]:
def test_model_ocsoftmax(feat_model_path, loss_model_path, part, add_loss, device):
    dirname = os.path.dirname
    basename = os.path.splitext(os.path.basename(feat_model_path))[0]
    model_feats=[]
    model_labels=[]
    count=0
    prev_label=None
    if "checkpoint" in dirname(feat_model_path):
        dir_path = dirname(dirname(feat_model_path))
    else:
        dir_path = dirname(feat_model_path)
    model=torch.load(feat_model_path,map_location='cuda').to(device)
    loss_model=torch.load(loss_model_path,map_location='cuda').to(device)
    is_eval = (part == 'eval')
    
    test_set = SoundFeatureDataset('../Data/PA','../DL/mfcc',is_logical=False, is_train=True)
    testDataLoader = DataLoader(test_set, batch_size=10, shuffle=False)
    model.eval()

    with open(os.path.join('../DL/models/resnet/', 'checkpoint_cm_score.txt'), 'w') as cm_score_file:
        for i, (lfcc, labels,filenames) in enumerate(tqdm(testDataLoader)):
            lfcc = lfcc.unsqueeze(1).float().to(device)
            labels = labels.to(device)
            feats, lfcc_outputs = model(lfcc)
            
            score = F.softmax(lfcc_outputs)[:, 0]

            if add_loss == "ocsoftmax":
                ang_isoloss, score = loss_model(feats, labels)
            elif add_loss == "amsoftmax":
                outputs, moutputs = loss_model(feats, labels)
                score = F.softmax(outputs, dim=1)[:, 0]

            for j in range(labels.size(0)):
                if count<=300:
                    if prev_label!=labels[j]:
                        model_feats.append(feats[j].detach().cpu().numpy())
                        model_labels.append("spoof" if labels[j].data.cpu().numpy() else "bonafide")
                        prev_label=labels[j]
                        count=count+1
                cm_score_file.write(
                    '%s %s %s\n' % (filenames[j],"spoof" if labels[j].data.cpu().numpy() else "bonafide",score[j].item()))
    
    output_file = os.path.join('../DL/models/resnet/', 'feats_' + 'mfcc_resnet' + ".tsv")
    meta_file = os.path.join('../DL/models/resnet/', 'meta_' + 'mfcc_resnet' + ".tsv")
    
    with open(output_file, 'w') as f:
        for feat in model_feats:
            sample_str = '\t'.join([str(e) for e in feat])
            f.write(f"{sample_str}\n")
    
    with open(meta_file, 'w') as meta:
        for label in model_labels:
            meta.write(f"{label}\n")

    

def test_ocsoftmax(model_dir, model, loss_model, add_loss, device):
    model_path = os.path.join(model_dir, model)
    loss_model_path = os.path.join(model_dir, loss_model)
    test_model_ocsoftmax(model_path, loss_model_path, "eval", add_loss, device)

In [39]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_dir = '../Data/models/resnet/v1'
model = 'anti-spoofing_mfcc_model.pt'
loss_model = 'anti-spoofing_loss_mfcc_model .pt'
add_loss = 'ocsoftmax'
test_ocsoftmax(model_dir, model, loss_model, add_loss,device)

FileNotFoundError: [Errno 2] No such file or directory: '../Data/models\\anti-spoofing_mfcc_model.pt'

In [ ]:
from embedding import *
import numpy as np
def compute_det_curve(target_scores, nontarget_scores):

    n_scores = target_scores.size + nontarget_scores.size
    all_scores = np.concatenate((target_scores, nontarget_scores))
    labels = np.concatenate((np.ones(target_scores.size), np.zeros(nontarget_scores.size)))

    # Sort labels based on scores
    indices = np.argsort(all_scores, kind='mergesort')
    labels = labels[indices]

    # Compute false rejection and false acceptance rates
    tar_trial_sums = np.cumsum(labels)
    nontarget_trial_sums = nontarget_scores.size - (np.arange(1, n_scores + 1) - tar_trial_sums)

    frr = np.concatenate((np.atleast_1d(0), tar_trial_sums / target_scores.size))  # false rejection rates
    far = np.concatenate((np.atleast_1d(1), nontarget_trial_sums / nontarget_scores.size))  # false acceptance rates
    thresholds = np.concatenate((np.atleast_1d(all_scores[indices[0]] - 0.001), all_scores[indices]))  # Thresholds are the sorted scores

    return frr, far, thresholds


def compute_eer(target_scores, nontarget_scores):
    """ Returns equal error rate (EER) and the corresponding threshold. """
    frr, far, thresholds = compute_det_curve(target_scores, nontarget_scores)
    abs_diffs = np.abs(frr - far)
    min_index = np.argmin(abs_diffs)
    eer = np.mean((frr[min_index], far[min_index]))
    return eer*100, thresholds[min_index]


scores_fname='res/lfcc_renset18_cm_score.txt'
target_scores=[]
nontarget_scores=[]
scores=[]
y_true=[]
lines_score=open(scores_fname).readlines()
for line_score in lines_score:
    score = float(line_score.strip().split(' ')[2])
    scores.append(score)
    meta_1 = line_score.strip().split(' ')[0]
    y = int(line_score.strip().split(' ')[1] == 'spoof')
    y_true.append(y)
    if(y==0):
        target_scores.append(score)
    else:
        nontarget_scores.append(score)
eer_roc=compute_eer(np.array(target_scores), np.array(nontarget_scores))
print("eer(%):",eer_roc)

In [14]:
# feature_fn = extract_lfcc
# transforms = transforms.Compose([
# lambda x: pad(x),
# lambda x: librosa.util.normalize(x),
# lambda x: feature_fn(x),
# lambda x: Tensor(x)
# ])

In [15]:
# def test_model_ocsoftmax(feat_model_path, loss_model_path, part, add_loss, device):
#     dirname = os.path.dirname
#     basename = os.path.splitext(os.path.basename(feat_model_path))[0]
#     if "checkpoint" in dirname(feat_model_path):
#         dir_path = dirname(dirname(feat_model_path))
#     else:
#         dir_path = dirname(feat_model_path)
#     print(feat_model_path)
#     print(loss_model_path)
#     model=torch.load(feat_model_path,map_location='cuda').to(device)
#     loss_model=torch.load(loss_model_path,map_location='cuda').to(device)
#     is_eval = (part == 'eval')
#     test_set = ASVDataset('/kaggle/input/asvpoof-2019-dataset/LA/LA', is_train=False, is_eval=True, is_logical=True, transform=transforms,
#                           feature_name='lfcc',random_sample=30000)
#     testDataLoader = DataLoader(test_set, batch_size=10, shuffle=False)
#     model.eval()

#     with open(os.path.join('/kaggle/working/', 'checkpoint_cm_score.txt'), 'w') as cm_score_file:
#         for i, (lfcc, labels,filenames) in enumerate(tqdm(testDataLoader)):
#             lfcc = lfcc.unsqueeze(1).float().to(device)
#             labels = labels.to(device)
#             feats, lfcc_outputs = model(lfcc)

#             score = F.softmax(lfcc_outputs)[:, 0]

#             if add_loss == "ocsoftmax":
#                 ang_isoloss, score = loss_model(feats, labels)
#             elif add_loss == "amsoftmax":
#                 outputs, moutputs = loss_model(feats, labels)
#                 score = F.softmax(outputs, dim=1)[:, 0]

#             for j in range(labels.size(0)):
#                 cm_score_file.write(
#                     '%s %s %s\n' % (filenames[j],"spoof" if labels[j].data.cpu().numpy() else "bonafide",score[j].item()))


# def test_ocsoftmax(model_dir, model, loss_model, add_loss, device):
#     model_path = os.path.join(model_dir, model)
#     loss_model_path = os.path.join(model_dir, loss_model)
#     test_model_ocsoftmax(model_path, loss_model_path, "eval", add_loss, device)


# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model_dir = '/kaggle/input/resnt_ossoftamx/pytorch/resnet_models/1'
# model = 'anti-spoofing_lfcc_model.pt'
# loss_model = 'anti-spoofing_loss_model.pt'
# add_loss = 'ocsoftmax'
# test_ocsoftmax(model_dir, model, loss_model, add_loss,device)