In [None]:
import tensorflow as tf
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.regularizers import *
from tensorflow.keras.constraints import unit_norm
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.utils import Sequence
import tensorflow.keras.backend as K
import random
import os
import logmelspectr_params as params
import pandas as pd
import numpy as np
import itertools
import os

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

### Dataset info

In [None]:
train_txt_path = "/nas/public/dataset/asvspoof2019/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt"
train_feat_root = "/nas/home/cborrelli/tripletloss_bot/features/logmelspectr/train"
df_train = pd.read_csv(train_txt_path, sep=" ", header=None)
df_train.columns = ["speaker_id", "audio_filename", "null", "system_id", "label"]
df_train = df_train.drop(columns="null")

dev_txt_path = "/nas/public/dataset/asvspoof2019/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.dev.trl.txt"
dev_feat_root = "/nas/home/cborrelli/tripletloss_bot/features/logmelspectr/dev"
df_dev = pd.read_csv(dev_txt_path, sep=" ", header=None)
df_dev.columns = ["speaker_id", "audio_filename", "null", "system_id", "label"]
df_dev = df_dev.drop(columns="null")


eval_txt_path = "/nas/public/dataset/asvspoof2019/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.eval.trl.txt"
eval_feat_root = "/nas/home/cborrelli/tripletloss_bot/features/logmelspectr/eval"
df_eval = pd.read_csv(eval_txt_path, sep=" ", header=None)
df_eval.columns = ["speaker_id", "audio_filename", "null", "system_id", "label"]
df_eval = df_eval.drop(columns="null")

### Data generators

In [None]:
def frame(data, window_length, hop_length):
    """Convert array into a sequence of successive possibly overlapping frames.
    An n-dimensional array of shape (num_samples, ...) is converted into an
    (n+1)-D array of shape (num_frames, window_length, ...), where each frame
    starts hop_length points after the preceding one.
    This is accomplished using stride_tricks, so the original data is not
    copied.  However, there is no zero-padding, so any incomplete frames at the
    end are not included.
    Args:
    data: np.array of dimension N >= 1.
    window_length: Number of samples in each frame.
    hop_length: Advance (in samples) between each window.
    Returns:
    (N+1)-D np.array with as many rows as there are complete frames that can be
    extracted.
    """
    num_samples = data.shape[0]
    num_frames = 1 + int(np.floor((num_samples - window_length) / hop_length))
    shape = (num_frames, window_length) + data.shape[1:]
    strides = (data.strides[0] * hop_length,) + data.strides
    result = np.lib.stride_tricks.as_strided(data, shape=shape, strides=strides)
    return result

class TrainDataGenerator(Sequence):
    'Generates data for Keras'
    
    def __init__(self, dataframe, feature_path, batch_size=32, dim=(96, 64), n_channels=1,
                  shuffle=True, classes_list=['-', 'A01', 'A02', 'A03', 'A04', 'A05', 'A06'],
                num_batch_epoch=100):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.dataframe = dataframe
        self.classes_list = classes_list
        self.n_channels = n_channels
        self.len = num_batch_epoch
        self.feature_path = feature_path

    def __len__(self):
        'Denotes the number of batches per epoch'
        return self.len
    
    def __getitem__(self, batch_index):
        'Generate one batch of data'        
        negative_couples_classes = np.array(list(itertools.combinations(self.classes_list, r=2)))
        positive_couples_classes = np.array(list(zip(self.classes_list, self.classes_list)))

        negative_selected_pairs = negative_couples_classes[np.random.choice(negative_couples_classes.shape[0], 
                                                            self.batch_size // 2, replace=True), :]
        positive_selected_pairs = positive_couples_classes[np.random.choice(positive_couples_classes.shape[0], 
                                                            self.batch_size // 2, replace=True), :]

        selected_pairs = np.concatenate((positive_selected_pairs, negative_selected_pairs), axis=0)

        y = np.concatenate((np.zeros((self.batch_size//2)), np.ones((self.batch_size//2))), axis=0)

        features_sample_rate = 1.0 / params.STFT_HOP_LENGTH_SECONDS

        example_window_length = int(round(
            params.EXAMPLE_WINDOW_SECONDS * features_sample_rate))
        example_hop_length = int(round(
            params.EXAMPLE_HOP_SECONDS * features_sample_rate))

        X_0 = np.empty((self.batch_size, *self.dim, self.n_channels))
        X_1 = np.empty((self.batch_size, *self.dim, self.n_channels))

        for sample_batch_index, pairs in enumerate(selected_pairs):

            sample = np.empty((2, *self.dim, self.n_channels))
            for a, alg in enumerate(pairs):
                row = self.dataframe[self.dataframe.system_id == alg].sample(n=1)
                log_mel = np.load(os.path.join(self.feature_path, row['audio_filename'].values[0] + '.npy'))
                log_mel = log_mel.transpose()

                if log_mel.shape[0] < self.dim[0]:
                    pad_len = self.dim[0] - log_mel.shape[0] + 1
                    log_mel = np.pad(log_mel, ((0, pad_len), (0, 0)))

                log_mel = frame(log_mel, example_window_length, example_hop_length)

                selected_frame = np.random.randint(low=0, high=log_mel.shape[0], size=1)

                selected_log_mel = log_mel[selected_frame, :, :]
                selected_log_mel = selected_log_mel[0,:, :, np.newaxis]

                sample[a] = selected_log_mel

            X_0[sample_batch_index] = sample[0]            
            X_1[sample_batch_index] = sample[1]            


        return [X_0, X_1], y

    
    
class TestDataGenerator(Sequence):
    'Generates data for Keras'
    
    def __init__(self, dataframe, feature_path, batch_size=32, dim=(96, 64), n_channels=1,
                  shuffle=True, classes_pair=['-', '-'],
                num_batch_epoch=100):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.dataframe = dataframe
        self.n_channels = n_channels
        self.len = num_batch_epoch
        self.feature_path = feature_path
        self.classes_pair = classes_pair

    def __len__(self):
        'Denotes the number of batches per epoch'
        return self.len
    
    def __getitem__(self, batch_index):
        'Generate one batch of data'        

            # If i am specifying only one element it means I want to use the data generator for testing
            # only one class
        
        selected_pairs =  [self.classes_pair] * self.batch_size
        features_sample_rate = 1.0 / params.STFT_HOP_LENGTH_SECONDS
        example_window_length = int(round(
            params.EXAMPLE_WINDOW_SECONDS * features_sample_rate))
        example_hop_length = int(round(
            params.EXAMPLE_HOP_SECONDS * features_sample_rate))

        X_0 = np.empty((self.batch_size, *self.dim, self.n_channels))
        X_1 = np.empty((self.batch_size, *self.dim, self.n_channels))
        
        if self.classes_pair[0] == self.classes_pair[1]:
            y = np.zeros((self.batch_size))
        else:
            y = np.ones((self.batch_size))
            
        for sample_batch_index, pairs in enumerate(selected_pairs):

            sample = np.empty((2, *self.dim, self.n_channels))
            for a, alg in enumerate(pairs):
                row = self.dataframe[self.dataframe.system_id == alg].sample(n=1)
                log_mel = np.load(os.path.join(self.feature_path, row['audio_filename'].values[0] + '.npy'))
                log_mel = log_mel.transpose()

                if log_mel.shape[0] < self.dim[0]:
                    pad_len = self.dim[0] - log_mel.shape[0] + 1
                    log_mel = np.pad(log_mel, ((0, pad_len), (0, 0)))

                log_mel = frame(log_mel, example_window_length, example_hop_length)

                selected_frame = np.random.randint(low=0, high=log_mel.shape[0], size=1)

                selected_log_mel = log_mel[selected_frame, :, :]
                selected_log_mel = selected_log_mel[0,:, :, np.newaxis]

                sample[a] = selected_log_mel

            X_0[sample_batch_index] = sample[0]            
            X_1[sample_batch_index] = sample[1]   
            
            


        return [X_0, X_1], y

### Load model

In [None]:
# https://gombru.github.io/2019/04/03/ranking_loss/
def contrastive_loss(y_true, y_pred):
    margin = 1
    return K.mean((1 - y_true) * K.square(y_pred) + (y_true) * K.square(K.maximum(margin - y_pred, 0)))

def euclidean_distance(vects):
    x, y = vects
    return K.sqrt(K.maximum(K.sum(K.square(x - y), axis=-1, keepdims=True), K.epsilon()))

In [None]:
model = load_model('/nas/home/cborrelli/tripletloss_bot/checkpoints/siamese',  
                   custom_objects={'contrastive_loss': contrastive_loss, 
                                   'euclidean_distance':euclidean_distance})


In [None]:
model.summary()

### Test model

In [None]:
#test_generator = TestDataGenerator(dataframe=df_dev, feature_path=dev_feat_root, classes_pair=['A07', 'A01'])
test_generator = TestDataGenerator(dataframe=df_eval, feature_path=eval_feat_root, classes_pair=['A08', 'A13'])
predicted_diff = model.predict(test_generator)

In [None]:
#test_generator = TestDataGenerator(dataframe=df_dev, feature_path=dev_feat_root, classes_pair=['A01', 'A01'])
test_generator = TestDataGenerator(dataframe=df_eval, feature_path=eval_feat_root, classes_pair=['A08', 'A08'])
predicted_same = model.predict(test_generator)

In [None]:
len(predicted_same)

In [None]:
import seaborn as sns
sns.displot(predicted_same, kind='hist', kde=True)
sns.displot(predicted_diff, kind='hist', kde=True)
