In [1]:
import pandas as pd
import numpy as np
import sys
import os
import scipy.stats as stats
import time
import tqdm
import seaborn as sns
from sklearn import svm
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
from sklearn.preprocessing import normalize
from sklearn.ensemble import RandomForestClassifier

In [2]:
os.nice(2)

2

### Load CSV

In [9]:
train_txt_path = '../dataset/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt'

df_train = pd.read_csv(train_txt_path, sep=" ", header=None)
df_train.columns = ["speaker_id", "audio_filename", "null", "system_id", "label"]
df_train = df_train.drop(columns="null")

dev_txt_path = '../dataset/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.dev.trl.txt'

df_dev = pd.read_csv(dev_txt_path, sep=" ", header=None)
df_dev.columns = ["speaker_id", "audio_filename", "null", "system_id", "label"]
df_dev = df_dev.drop(columns="null")

eval_txt_path = '../dataset/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.eval.trl.txt'

df_eval = pd.read_csv(eval_txt_path, sep=" ", header=None)
df_eval.columns = ["speaker_id", "audio_filename", "null", "system_id", "label"]
df_eval = df_eval.drop(columns="null")

### Params

In [10]:
nfft = 256
hop_size = 128 

### Train features

In [11]:
train_feat_root_path = '../features/bicoherences/train_nfft_{}_hop_size_{}'.format(nfft, hop_size)

df_train["mean_mag"] = np.nan
df_train["var_mag"] = np.nan
df_train["skew_mag"] = np.nan
df_train["kurt_mag"] = np.nan

df_train["mean_phase"] = np.nan
df_train["var_phase"] = np.nan
df_train["skew_phase"] = np.nan
df_train["kurt_phase"] = np.nan


for index, row in tqdm.tqdm(df_train.iterrows(), total=df_train.shape[0]):
    feat_path = os.path.join(train_feat_root_path, row['audio_filename'] + '.npy')
    bicoh = np.load(feat_path)
    mag = np.abs(bicoh)
    phase = np.angle(bicoh)
    df_train.at[index, 'mean_mag'] = np.mean(mag)
    df_train.at[index, 'var_mag'] = np.var(mag)
    df_train.at[index, 'skew_mag'] = stats.skew(mag, axis=None)
    df_train.at[index, 'kurt_mag'] = stats.kurtosis(mag, axis=None)
    
    df_train.at[index, 'mean_phase'] = np.mean(phase)
    df_train.at[index, 'var_phase'] = np.var(phase)
    df_train.at[index, 'skew_phase'] = stats.skew(phase, axis=None)
    df_train.at[index, 'kurt_phase'] = stats.kurtosis(phase, axis=None)

df_train.to_pickle('../features/bicoherences/dataframes/train_bicoh_stats_nfft_{}_hop_size_{}.pkl'.format(nfft, hop_size))

100%|██████████| 25380/25380 [01:11<00:00, 356.19it/s]


df_train = pd.read_pickle('../features/bicoherences/dataframes/train_bicoh_stats_nfft_{}_hop_size_{}'.format(
    nfft, hop_size))

In [12]:
df_train

Unnamed: 0,speaker_id,audio_filename,system_id,label,mean_mag,var_mag,skew_mag,kurt_mag,mean_phase,var_phase,skew_phase,kurt_phase
0,LA_0079,LA_T_1138215,-,bonafide,0.171191,0.020350,1.286176,1.490581,0.019793,3.353757,-0.006319,-1.233394
1,LA_0079,LA_T_1271820,-,bonafide,0.212206,0.035456,1.206880,0.753010,0.010239,3.169798,-0.005019,-1.182913
2,LA_0079,LA_T_1272637,-,bonafide,0.192199,0.029513,1.346364,1.548283,0.007696,3.106027,0.001081,-1.071725
3,LA_0079,LA_T_1276960,-,bonafide,0.190471,0.030015,1.461145,1.928813,0.017408,2.707104,0.007092,-0.850293
4,LA_0079,LA_T_1341447,-,bonafide,0.144300,0.017638,1.750225,3.729260,0.009151,3.209099,-0.001014,-1.122370
5,LA_0079,LA_T_1363611,-,bonafide,0.215749,0.022725,1.079326,1.082444,0.015106,3.437437,-0.006955,-1.260984
6,LA_0079,LA_T_1596451,-,bonafide,0.227750,0.037856,1.293568,1.259698,0.026367,3.520126,-0.012694,-1.235622
7,LA_0079,LA_T_1608170,-,bonafide,0.228898,0.026910,1.047411,0.791581,0.011729,2.786679,0.003128,-1.013201
8,LA_0079,LA_T_1684951,-,bonafide,0.243315,0.037251,1.065995,0.458157,0.006613,2.850528,0.002945,-1.101708
9,LA_0079,LA_T_1699801,-,bonafide,0.195669,0.029245,1.314028,1.437135,0.020655,3.175665,-0.004468,-1.130064


### Load dev features

In [13]:
dev_feat_root_path = '../features/bicoherences/dev_nfft_{}_hop_size_{}'.format(nfft, hop_size)

df_dev["mean_mag"] = np.nan
df_dev["var_mag"] = np.nan
df_dev["skew_mag"] = np.nan
df_dev["kurt_mag"] = np.nan

df_dev["mean_phase"] = np.nan
df_dev["var_phase"] = np.nan
df_dev["skew_phase"] = np.nan
df_dev["kurt_phase"] = np.nan


for index, row in tqdm.tqdm(df_dev.iterrows(), total=df_dev.shape[0]):
    feat_path = os.path.join(dev_feat_root_path, row['audio_filename'] + '.npy')
    bicoh = np.load(feat_path)
    mag = np.abs(bicoh)
    phase = np.angle(bicoh)
    df_dev.at[index, 'mean_mag'] = np.mean(mag)
    df_dev.at[index, 'var_mag'] = np.var(mag)
    df_dev.at[index, 'skew_mag'] = stats.skew(mag, axis=None)
    df_dev.at[index, 'kurt_mag'] = stats.kurtosis(mag, axis=None)
    
    df_dev.at[index, 'mean_phase'] = np.mean(phase)
    df_dev.at[index, 'var_phase'] = np.var(phase)
    df_dev.at[index, 'skew_phase'] = stats.skew(phase, axis=None)
    df_dev.at[index, 'kurt_phase'] = stats.kurtosis(phase, axis=None)


df_dev.to_pickle('../features/bicoherences/dataframes/dev_bicoh_stats_nfft_{}_hop_size_{}.pkl'.format(nfft, hop_size))

100%|██████████| 24844/24844 [01:09<00:00, 357.16it/s]


df_dev = pd.read_pickle('../features/bicoherences/dataframes/dev_bicoh_stats_nfft_{}_hop_size_{}'.format(
    nfft, hop_size))

### Load eval features

In [14]:
eval_feat_root_path = '../features/bicoherences/eval_nfft_{}_hop_size_{}'.format(nfft, hop_size)

df_eval["mean_mag"] = np.nan
df_eval["var_mag"] = np.nan
df_eval["skew_mag"] = np.nan
df_eval["kurt_mag"] = np.nan

df_eval["mean_phase"] = np.nan
df_eval["var_phase"] = np.nan
df_eval["skew_phase"] = np.nan
df_eval["kurt_phase"] = np.nan


for index, row in tqdm.tqdm(df_eval.iterrows(), total=df_eval.shape[0]):
    feat_path = os.path.join(eval_feat_root_path, row['audio_filename'] + '.npy')
    bicoh = np.load(feat_path)
    mag = np.abs(bicoh)
    phase = np.angle(bicoh)
    df_eval.at[index, 'mean_mag'] = np.mean(mag)
    df_eval.at[index, 'var_mag'] = np.var(mag)
    df_eval.at[index, 'skew_mag'] = stats.skew(mag, axis=None)
    df_eval.at[index, 'kurt_mag'] = stats.kurtosis(mag, axis=None)
    
    df_eval.at[index, 'mean_phase'] = np.mean(phase)
    df_eval.at[index, 'var_phase'] = np.var(phase)
    df_eval.at[index, 'skew_phase'] = stats.skew(phase, axis=None)
    df_eval.at[index, 'kurt_phase'] = stats.kurtosis(phase, axis=None)
    
df_eval.to_pickle('../features/bicoherences/dataframes/eval_bicoh_stats_nfft_{}_hop_size_{}.pkl'.format(nfft, hop_size))

100%|██████████| 71237/71237 [03:21<00:00, 353.07it/s]


df_eval = pd.read_pickle('../features/bicoherences/dataframes/eval_bicoh_stats_nfft_{}_hop_size_{}'.format(
    nfft, hop_size))

### Prepare data

In [None]:
X_train = df_train.iloc[:, 4:].values


In [None]:
X_dev = df_dev.iloc[:, 4:].values


In [None]:
X_eval = df_eval.iloc[:, 4:].values


### Open set RF

In [None]:
import itertools
unknown_number = 2
unknown_label = 7
multiclass_list= {'-', 'A01', 'A02', 'A03', 'A04', 'A05', 'A06'}
unknown_combinations = itertools.combinations(multiclass_list, unknown_number)


for u in unknown_combinations:
    multiclass_dict = {'-':0, 'A01':1, 'A02':2, 'A03':3, 'A04':4, 'A05':5, 'A06':6}
    
    if u[0] == '-' or u[1] == '-':
        continue
        
    # label 8 corresponds to unknown known
    for i in range(len(u)):
        multiclass_dict[u[i]] = unknown_label 

    
    y_train_open_set = df_train.iloc[:, 2].values
    y_train_open_set = [multiclass_dict[a] for a in y_train_open_set]
    
    y_dev_open_set = df_dev.iloc[:, 2].values
    y_dev_open_set = [multiclass_dict[a] for a in y_dev_open_set]
    
    y_eval_open_set = unknown_label * np.ones(X_eval.shape[0])
    
    X = X_train
    y = y_train_open_set
    #X = np.concatenate([X_train, X_dev])
    #y = np.concatenate([y_train_open_set, y_dev_open_set])

    
    
    open_set_clf = RandomForestClassifier(class_weight='balanced', n_jobs=-1, random_state=2)
    open_set_clf.fit(X, y)
    
    #y_predict = open_set_clf.predict(X_eval)
    y_predict_dev = open_set_clf.predict(X_dev)
    y_predict_eval = open_set_clf.predict(X_eval)
    y_predict_train = open_set_clf.predict(X_train)
    
    #cm = confusion_matrix(y_eval_open_set, y_predict, normalize='true')
    cm_train = confusion_matrix(y_train_open_set, y_predict_train, normalize='true')
    cm_dev = confusion_matrix(y_dev_open_set, y_predict_dev, normalize='true')
    cm_eval = confusion_matrix(y_eval_open_set, y_predict_eval, normalize='true')
    
    plt.figure(figsize = (20,8))
    
    plt.subplot(131)
    plt.title("Unknown classes are {} and {} for train".format(u[0], u[1]))
    sns.heatmap(cm_train, annot=True, vmin=0, vmax=1)

    plt.subplot(132)
    plt.title("Unknown classes are {} and {} for dev".format(u[0], u[1]))
    sns.heatmap(cm_dev, annot=True, vmin=0, vmax=1)
    
    plt.subplot(133)
    plt.title("Unknown classes are {} and {} for eval".format(u[0], u[1]))
    sns.heatmap(cm_eval, annot=True, vmin=0, vmax=1)
    
    plt.show()

In [None]:

list(unknown_combinations)

### Binary SVM

In [None]:
y_train = df_train.iloc[:, 3].values
y_train = [1 if i=='spoof' else 0 for i in y_train]

y_dev = df_dev.iloc[:, 3].values
y_dev = [1 if i=='spoof' else 0 for i in y_dev]

y_eval = df_eval.iloc[:, 3].values
y_eval = [1 if i=='spoof' else 0 for i in y_dev]


In [None]:
clf = svm.SVC(class_weight='balanced')
clf.fit(X_train, y_train)

 NFFT = 256, HOP_SIZE = 128
 
 0.89

In [None]:
clf.score(X_dev, y_dev)

In [None]:
y_predict = clf.predict(X_dev)
cm = confusion_matrix(y_dev, y_predict, normalize='true' )

plt.figure(figsize = (10,7))
sns.heatmap(cm, annot=True)

### Multiclass SVM

In [None]:
multiclass_dict = {'-':0, 'A01':1, 'A02':2, 'A03':3, 'A04':4, 'A05':5, 'A06':6}
y_train_multiclass = df_train.iloc[:, 2].values
y_train_multiclass = [multiclass_dict[a] for a in y_train_multiclass]

y_dev_multiclass = df_dev.iloc[:, 2].values
y_dev_multiclass = [multiclass_dict[a] for a in y_dev_multiclass]

In [None]:
mclf = svm.SVC()
mclf.fit(X_train, y_train_multiclass)

In [None]:
y_predict_multiclass = mclf.predict(X_dev)
cm = confusion_matrix(y_dev_multiclass, y_predict_multiclass)


plt.figure(figsize = (10,7))
sns.heatmap(cm, annot=True)

In [None]:
nfft = [512, 256, 128, 64]
hop_size = [256, 128, 64, 32]
fft_params = zip(nfft, hop_size)