# LCNN(Light Convolutional Neural Network)
Front-end are LFCC, CQCC.<br>LCNN is adopted to perform back-end.

### Aim
>1. Edit pytorch dataset Done
>2. Code LCNN architecture
>3. Build LCNN model
>4. Test

In [74]:
# Library for dataloader
import os.path
import glob
from hdf5storage import loadmat

# Feature extraction
import librosa
import soundfile as sf
import numpy as np
import pandas as pd
import joblib

# Library for pytorch
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision
from torchvision import models, transforms

# Configuration
import random

In [76]:
# Set fixed random seed
# Setting random seeds for reproducibility.
seed = 120
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic=True # CUDA determinism 

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
float_formatter = "{:.4f}".format

np.set_printoptions(formatter={'float_kind': float_formatter})

In [3]:
def make_datapath_list(phase, feature_type):
    """
    make a list containing a path to data
    
    Parameters
    ----------
    phase: 'train' or 'dev' or 'eval'
        specify whether data is for train or development or evaluation
    
    Returns
    ----------
    path_list : list
        return a list containing a path to data
    """
    
    if feature_type in ['CQT','LFCC','CQCC']:
        root_path = "/home/s1260057/workspace/GT/ASV_anti-spoofing/datasets/"
        target_path = os.path.join(root_path, phase, feature_type, '*')
    
    elif feature_type in ['SPEC','FFT']:
        root_path = "/DB/Audio/English/ASVspoof2019/LA/"
        target_path = os.path.join(root_path, 'ASVspoof2019_LA_'+phase+'/flac/*')
        
    else:
        print('[Error: None of feature_types were matched.]')
        raise AttributeError
    
    print(target_path)
    
    path_list = []
    
    # Get a filepath to subdir by using glob module
    for path in sorted(glob.glob(target_path)):
        path_list.append(path)
    
    return path_list

# test
train_list = make_datapath_list(phase='train', feature_type='FFT')
dev_list = make_datapath_list(phase='dev', feature_type='LFCC')

print(len(train_list), len(dev_list))

/DB/Audio/English/ASVspoof2019/LA/ASVspoof2019_LA_train/flac/*
/home/s1260057/workspace/GT/ASV_anti-spoofing/datasets/dev/LFCC/*
25380 24844


In [4]:
class Preprocess(object):
    """
    Preprocessing class for audio data
    
    Attributes:
    
    """
    def __init__(self, shape):
        """
        Parameters
        ----------
        
        """
        self.shape = shape
        
    def __call__(self, d):
        """
        Extract fetures with lfcc, mfcc, cqcc and other method
        
        Parameters
        ----------
        
        """
        n = self.shape[0] - d.shape[0]
        # Cropping or truncation process might be inserted here, return fixed length feature matrix
        if n <= 0:
            x = self.padding(d, n)
        else:
            x = self.truncate(d, n)
            
        return x
    
    def padding(self, x, n):
        # Pad x to be n matrix
        return x
    
    def truncate(self, x, n):
        # Truncate x to be n matrix
        return x
    

In [5]:
class FeatureExtractor(object):
    """
    FeatureExtractor class for audio data
    
    Attributes:
    
    """
    def __init__(self, feature_type):
        """
        Parameters
        ----------
        
        """
        self.extractor = None
        self.feature_type = feature_type
        
    def __call__(self, y, sr, dynamic=True):
        """
        Extract fetures such as fft, spectrogram and other methods
        
        Parameters
        ----------
        
        """
        if self.feature_type == 'LFCC':
            self.extractor = LFCC(y, sr)
            
        elif self.feature_type == 'MFCC':
            self.extractor = MFCC(y, sr)
        
        elif self.feature_type == 'CQCC':
            self.extractor = CQCC(y, sr)
        
        elif self.feature_type == 'FFT':
            self.extractor = FFT(y, sr)
        
        elif self.feature_type == 'SPEC':
            self.extractor = SPEC(y, sr)
            
        else:
            print('Wrong feature extraction method specified')
            raise AttributeError
        
        if dynamic:
            features = self.extractor.extract_feature(delta=True)
        else:
            features = self.extractor.extract_feature()
        
        return features

In [32]:
# Make dataloader
class ASVspoofDataSet(data.Dataset):
    """
    Dataset class for ASVspoof2019, which derived from torch.utils.data.Dataset class
    
    Attributes:
    --------------
    file_list: list
        list containing a path to data
        
    transform: object
        instance of PreProcessor
    
    phase: str
        'train' or 'dev' or 'eval'
    """
    
    def __init__(self, file_list, phase, feature_type, preprocess=None, extractor=None, detailed_label=False):
        """
        Parameters
        ----------
        file_list: list
            list of audio files to read
        
        label_list: list
            list of labels('bonafide' or 'spoof'), which is changed to 0, 1
        
        transform: class PreProcess
            instance of PreProcess to be used for pre-process to audio data
        
        phase: str
            specify whether data is for training or development or evaluation('train' or 'dev' or 'eval')
            
        """
        self.file_list = file_list
        self.phase = phase
        self.feature_type = feature_type
        self.preprocess = preprocess
        self.feature_extract = extractor
        self.detailed_label = detailed_label
        
        if self.detailed_label:
            
            self.root_path = '/DB/Audio/English/ASVspoof2019/LA/ASVspoof2019_LA_cm_protocols/'
            
            if self.phase == 'train':
                self.label_path = os.path.join(self.root_path, 'ASVspoof2019.LA.cm.train.trn.txt')
                self.label_list = []
                with open(self.label_path, mode='r') as protocols:
                    for line in protocols:
                        line = line.split() # read line by line
                        filename, label = line[1], line[-1] # get filename and label from protocols file
                        self.label_list.append((filename, label))
            
            elif self.phase == 'dev':
                self.label_path = os.path.join(self.root_path, 'ASVspoof2019.LA.cm.dev.trl.txt')
                self.label_list = []
                with open(self.label_path, mode='r') as protocols:
                    for line in protocols:
                        line = line.split() # read line by line
                        filename, label = line[1], (line[0], line[3], line[-1]) # get items from protocols file
                        self.label_list.append((filename, label))
            
            elif self.phase == 'eval':
                self.label_path = os.path.join(self.root_path, 'ASVspoof2019.LA.cm.eval.trl.txt')
                self.label_list = []
                with open(self.label_path, mode='r') as protocols:
                    for line in protocols:
                        line = line.split()
                        # Extract speaker_id, system_id, key
                        filename, label = line[1], (line[0], line[3], line[-1])
                        # Make a pair of filename and label
                        self.label_list.append((filename, label))
            else:
                print("Unsupported phase specified. You must pass either phase='train' or 'dev' or 'eval'")
        
    def __len__(self): # this is needed to be overrided
        return len(self.file_list)
    
    def __getitem__(self, index): # this is also needed to be overrided
        """
        Get preprocessed data and its label
        """
        if self.feature_type in ['CQT','LFCC','CQCC']:
            # load feature matrix
            #print( type(loadmat(self.file_list[index])['x_fea']) ) => ndarray
            features = loadmat(self.file_list[index])['x_fea']
            label = self.file_list[index].split('_')[-1].rstrip('.mat')
            
        elif self.feature_type in ['SPEC','FFT']:
            # load audio
            speech_path = self.file_list[index]
            speech, sr = sf.read(speech_path)
            
            ###
            #return speech, sr
            ###
            if self.feature_extract:
                features = self.feature_extract(y=speech, sr=sr)
            else:
                features = speech
            
            speech_name = speech_path.split('/')[-1].rstrip('.flac')
            label = None
            
            for fname, key in self.label_list:
                if fname == speech_name: # compare to speech_name with '==' annotation, check if they have same value.
                    label = key
                    #print("filename: {}, label: {}".format(fname, label))
            if label is None:
                #print('[debug print] Not-labeled filename:', speech_name)
                return None, None

        else:
            print('[Error: Unsupported feature_type]')
            raise AttributeError
            
        # preprocessing and extract features
        if self.preprocess:
            features = self.preprocess(d=features)
        #print(type(features))
        tensor = torch.from_numpy(features).float()
        #print(type(tensor))
        
        return tensor, label

# Test

phase = 'eval'

feature_type = 'FFT'

eval_list = make_datapath_list(phase=phase, feature_type=feature_type)
print(len(eval_list))

process = Preprocess(shape=(864, 400, 3))

extractor = FeatureExtractor(feature_type=feature_type)

asvspoof_eval = ASVspoofDataSet(file_list=eval_list, phase=phase,
        feature_type=feature_type, preprocess=process,
        extractor=None, detailed_label=True)

# Get 10 files and their label
iterations = 10

for itr in range(iterations):
    #print(asvspoof_train.file_list[itr])
    feature, label = asvspoof_eval.__getitem__(itr)
    print("60 vectors", feature.T.shape)
    print("audiofile label: ", label)
    print()

/DB/Audio/English/ASVspoof2019/LA/ASVspoof2019_LA_eval/flac/*
71933
60 vectors torch.Size([50888])
audiofile label:  ('LA_0044', 'A10', 'spoof')

60 vectors torch.Size([32986])
audiofile label:  ('LA_0001', 'A15', 'spoof')

60 vectors torch.Size([23846])
audiofile label:  ('LA_0023', 'A11', 'spoof')

60 vectors torch.Size([66346])
audiofile label:  ('LA_0043', 'A09', 'spoof')

60 vectors torch.Size([35489])
audiofile label:  ('LA_0014', 'A14', 'spoof')

60 vectors torch.Size([60312])
audiofile label:  ('LA_0011', 'A11', 'spoof')

60 vectors torch.Size([40884])
audiofile label:  ('LA_0026', 'A17', 'spoof')

60 vectors torch.Size([62487])
audiofile label:  ('LA_0038', 'A10', 'spoof')

60 vectors torch.Size([55968])
audiofile label:  ('LA_0023', 'A12', 'spoof')

60 vectors torch.Size([39633])
audiofile label:  ('LA_0029', 'A09', 'spoof')



In [7]:
print(len(asvspoof_eval.file_list))
print(len(asvspoof_eval.label_list))

71933
71237


In [None]:
import scipy.signal as signal
import matplotlib.pyplot as plt

x, fs = sf.read('LA_T_1028533.flac')
print(x.shape)

In [None]:
xx = np.arange(0, len(x))/fs

plt.plot(xx, x)

In [None]:
spec = np.abs(np.fft.fft(x, n=1724)[:1724//2])**2

In [None]:
spec.shape

In [None]:
1724/16000*1000

In [None]:
320/16000*1000

In [None]:
fx, tx, Sxx = signal.spectrogram(x, fs, window=np.hamming(1724), nperseg=1724, noverlap=1724*0.0081, nfft=1724)

fig = plt.figure(figsize=(12, 8))

plt.subplot(221)
image = plt.pcolormesh(tx, fx, np.log10(Sxx), shading='gouraud')
plt.xlabel('Time [sec]')
plt.ylabel('Frequency [Hz]')

In [None]:
Sxx.shape

In [None]:
f1, t1, Sxx1 = signal.spectrogram(x, fs, window=np.hamming(320), nperseg=320, noverlap=320//2, nfft=512, mode='magnitude')

f2, t2, Sxx2 = signal.spectrogram(x, fs, window=np.hamming(320), nperseg=320, noverlap=320//2, nfft=512, mode='magnitude')

fig = plt.figure(figsize=(12, 8))

plt.subplot(221)
image = plt.pcolormesh(t1, f1, np.log10(Sxx1), shading='gouraud')
plt.xlabel('Time [sec]')
plt.ylabel('Frequency [Hz]')

plt.subplot(222)
plt.pcolormesh(t2, f2, np.log10(Sxx2)**2, shading='gouraud')
plt.xlabel('Time [sec]')
plt.ylabel('Frequency [Hz]')

plt.show()

In [None]:
print(len(x)/16000)
print(Sxx1.shape)
print(Sxx2.shape)

In [None]:
y = librosa.core.stft(x, win_length=320, hop_length=320//2, n_fft=512, window=np.hamming(320), center=False)

y.shape

In [None]:

fig = plt.figure(figsize=(12, 8))

iterations = 8
for itr in range(iterations):
    
    x, sr = asvspoof_train.__getitem__(itr)
    
    f1, t1, Sxx1 = signal.spectrogram(x, sr, window=win, nperseg=320, noverlap=512//2, nfft=512, mode='magnitude')

    plt.subplot(2, 4, itr+1)
    plt.pcolormesh(t1, f1, np.log10(Sxx1), shading='gouraud')
    plt.xlabel('Time [sec]')
    plt.ylabel('Frequency [Hz]')

plt.show()

In [None]:
import librosa
import librosa.display
import librosa.core

In [None]:
x, sr = asvspoof_train.__getitem__(0)

D = librosa.amplitude_to_db(np.abs(librosa.core.stft(x, win_length=320, hop_length=160, n_fft=512, window=np.hamming(320), center=False)), ref=np.max)

librosa.display.specshow(D, sr=sr, hop_length=160, x_axis='time', y_axis='log')

plt.colorbar(format='%+2.0f dB')
plt.title('Log-frequency power spectrogram')

In [33]:

feature_type = 'FFT'

process = Preprocess(shape=(864, 400, 3))

extractor = FeatureExtractor(feature_type=feature_type)

phase = ['train', 'dev', 'eval']

sr = 16000

for p in phase:
    
    dlist = make_datapath_list(phase=p, feature_type=feature_type)
    
    print(len(dlist))
    
    dataset = ASVspoofDataSet(file_list=dlist, phase=p, feature_type=feature_type, preprocess=process, extractor=None, detailed_label=True)
    
    x_total = 0
    x_min, x_max = np.inf, -np.inf
    llist = np.array([])
    
    for itr in range(len(dlist)):
        x, label = dataset.__getitem__(itr)
        
        if label is None:
            continue
        
        x_total += len(x)
        llist = np.append(llist, len(x)/sr)
        x_min = min(x_min, len(x))
        x_max = max(x_max, len(x))
    
    print(len(llist))
    
    x_avg = x_total / len(llist) / sr
    x_var = sum( (llist - x_avg)**2 ) / len(llist)
    print('time_avg', x_avg)
    print('time_var', x_var)
    print(x_min, x_max)
    print()

/DB/Audio/English/ASVspoof2019/LA/ASVspoof2019_LA_train/flac/*
25380
25380
time_avg 3.4258241627265567
time_var 2.0131270544304867
10439 211007

/DB/Audio/English/ASVspoof2019/LA/ASVspoof2019_LA_dev/flac/*
24986
24844
time_avg 3.4781134705562713
time_var 2.1265527199077794
11122 185508

/DB/Audio/English/ASVspoof2019/LA/ASVspoof2019_LA_eval/flac/*
71933
71237
time_avg 3.1076934475764
time_var 2.192651186467914
7519 208409



In [89]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Max-Feature-Map layer
class MFM(nn.Module):
    
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, type=1):
        
        super(MFM, self).__init__()
        
        self.out_channels = out_channels
        
        if type == 1:
            self.filter = nn.Conv2d(in_channels, 2*out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
        else:
            self.filter = nn.Linear(in_channels, 2*out_channels)
            
    def forward(self, x):
        x = self.filter(x)
        out = torch.split(x, self.out_channels, 1)
        return torch.max(out[0], out[1])

### end of class MFM(Max-Feature-Map activation)

class Group(nn.Module):
    
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
    
        super(Group, self).__init__()
        self.conv_a = MFM(in_channels=in_channels, out_channels=in_channels, kernel_size=1, stride=1, padding=0)
        self.conv = MFM(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
    
    def forward(self, x):
        x = self.conv_a(x)
        x = self.conv(x)
        return x

### End of class Group ###

class FC(nn.Module):
    
    def __init__(self, input_size, out_size):
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(input_size, out_size)
    
    def forward(self, x):
        x = self.flatten(x)
        x = self.linear(x)
        return x

class LCNN_4layers(nn.Module):
    
    def __init__(self):
        
        super(LCNN_4layers, self).__init__()
        
        self.CNN = nn.Sequential(
            MFM(in_channels=1, out_channels=32, kernel_size=5, stride=1, padding=2),
            nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True),
            
            MFM(in_channels=32, out_channels=48, kernel_size=3, stride=1, padding=1),
            nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True),
            
            MFM(in_channels=48, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True),
            
            MFM(in_channels=64, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True),
        )
        
        self.FC1 = nn.Linear(54*25*32, 512)
        self.relu = nn.ReLU()
        self.MFM_FC1 = nn.Linear(512, 256)
        self.FC2 = nn.Linear(54*25*32, 256)
        #self.MFM_FC1 = MFM(in_channels=512, out_channels=256)
        
    def forward(self, x):
        x = self.CNN(x)
        x = x.view(x.size()[0], -1)
        #x = x.view(-1, self.num_flat_features(x))
        x = self.FC1(x)
        x = self.relu(x)
        x = self.MFM_FC1(x)
        return torch.sigmoid(x)
    
    def num_flat_features(self, x):
        size = x.size()[1:]  # all dimensions except the batch dimension
        num_features = 1
        for s in size:
            num_features *= s
        return num_features
    
    def debug(self, x):
        print(x.shape, type(x))
### End of class LCNN_4layers

In [90]:
model = LCNN_4layers()

In [96]:
input = torch.randn(32, 1, 864, 400)
print(input[:1])

tensor([[[[ 1.5279, -0.5306, -0.8125,  ..., -0.0663, -0.2144, -0.8342],
          [-1.2053, -1.6711, -1.5218,  ...,  0.9953, -0.0234, -1.1249],
          [-1.5653, -1.6728, -0.5363,  ..., -0.3048, -1.2572,  0.9384],
          ...,
          [-1.2192, -1.4453,  0.5569,  ..., -0.2578, -3.0906,  0.8226],
          [ 1.4315, -0.2452,  0.0921,  ...,  0.0855,  0.4544,  1.2625],
          [-2.4970, -0.5660, -1.8089,  ..., -1.7885,  1.0715,  1.5377]]]])


In [92]:
y = model(input)

In [14]:
input = torch.randn(32, 1, 5, 5)
m = nn.Sequential(
    nn.Conv2d(1, 32, 5, 1, 1),
    nn.Flatten()
)
output = m(input)

torch.Size([32, 288])