In [None]:
import tensorflow as tf
from google.colab import drive

# 使用工具colab的接口挂载google drive目录，这样可以从外部获取数据并且可以把训练好的模型保存在google drive上
drive.mount('/content/gdrive')
tf.test.gpu_device_name()

Mounted at /content/gdrive


'/device:GPU:0'

The code to compute PSD and masking threshold

In [None]:
import scipy.io.wavfile as wav
import numpy as np
from scipy.fftpack import fft
from scipy.fftpack import ifft
from scipy import signal
import scipy
import librosa

def compute_PSD_matrix(audio, window_size):
    """
	First, perform STFT.
	Then, compute the PSD.
	Last, normalize PSD.
    """

    win = np.sqrt(8.0/3.) * librosa.core.stft(audio, center=False)
    stft=librosa.core.stft(audio, center=False)
    magnitude, phase = librosa.magphase(stft)
    z = abs(win / window_size)
    psd_max = np.max(z*z)
    psd = 10 * np.log10(z * z + 0.0000000000000000001)
    PSD = 96 - np.max(psd) + psd
    return win,z,PSD, psd_max, np.max(psd),phase,stft

def Bark(f):
    """returns the bark-scale value for input frequency f (in Hz)"""
    return 13*np.arctan(0.00076*f) + 3.5*np.arctan(pow(f/7500.0, 2))

def quiet(f):
     """returns threshold in quiet measured in SPL at frequency f with an offset 12(in Hz)"""
     thresh = 3.64*pow(f*0.001,-0.8) - 6.5*np.exp(-0.6*pow(0.001*f-3.3,2)) + 0.001*pow(0.001*f,4) - 12
     return thresh

def two_slops(bark_psd, delta_TM, bark_maskee):
    """
	returns the masking threshold for each masker using two slopes as the spread function 
    """
    Ts = []
    for tone_mask in range(bark_psd.shape[0]):
        bark_masker = bark_psd[tone_mask, 0]
        dz = bark_maskee - bark_masker
        zero_index = np.argmax(dz > 0)
        sf = np.zeros(len(dz))
        sf[:zero_index] = 27 * dz[:zero_index]
        sf[zero_index:] = (-27 + 0.37 * max(bark_psd[tone_mask, 1] - 40, 0)) * dz[zero_index:] 
        T = bark_psd[tone_mask, 1] + delta_TM[tone_mask] + sf
        Ts.append(T)
    return Ts
    
def compute_th(PSD, barks, ATH, freqs):
    """ returns the global masking threshold
    """
    # Identification of tonal maskers
    # find the index of maskers that are the local maxima
    length = len(PSD)
    masker_index = signal.argrelextrema(PSD, np.greater)[0]
    
    
    # delete the boundary of maskers for smoothing
    if 0 in masker_index:
        masker_index = np.delete(0)
    if length - 1 in masker_index:
        masker_index = np.delete(length - 1)
    num_local_max = len(masker_index)

    # treat all the maskers as tonal (conservative way)
    # smooth the PSD 
    p_k = pow(10, PSD[masker_index]/10.)    
    p_k_prev = pow(10, PSD[masker_index - 1]/10.)
    p_k_post = pow(10, PSD[masker_index + 1]/10.)
    P_TM = 10 * np.log10(p_k_prev + p_k + p_k_post)
    
    # bark_psd: the first column bark, the second column: P_TM, the third column: the index of points
    _BARK = 0
    _PSD = 1
    _INDEX = 2
    bark_psd = np.zeros([num_local_max, 3])
    bark_psd[:, _BARK] = barks[masker_index]
    bark_psd[:, _PSD] = P_TM
    bark_psd[:, _INDEX] = masker_index
    
    # delete the masker that doesn't have the highest PSD within 0.5 Bark around its frequency 
    for i in range(num_local_max):
        next = i + 1
        if next >= bark_psd.shape[0]:
            break
            
        while bark_psd[next, _BARK] - bark_psd[i, _BARK]  < 0.5:
            # masker must be higher than quiet threshold
            if quiet(freqs[int(bark_psd[i, _INDEX])]) > bark_psd[i, _PSD]:
                bark_psd = np.delete(bark_psd, (i), axis=0)
            if next == bark_psd.shape[0]:
                break
                
            if bark_psd[i, _PSD] < bark_psd[next, _PSD]:
                bark_psd = np.delete(bark_psd, (i), axis=0)
            else:
                bark_psd = np.delete(bark_psd, (next), axis=0)
            if next == bark_psd.shape[0]:
                break        
    
    # compute the individual masking threshold
    delta_TM = 1 * (-6.025  -0.275 * bark_psd[:, 0])
    Ts = two_slops(bark_psd, delta_TM, barks) 
    Ts = np.array(Ts)
    
    # compute the global masking threshold
    theta_x = np.sum(pow(10, Ts/10.), axis=0) + pow(10, ATH/10.) 
 
    return theta_x

def generate_th(audio, fs, window_size=2048):
    """
	returns the masking threshold theta_xs and the max psd of the audio
    """
    win,z,PSD, psd_max, max_,phase,stft= compute_PSD_matrix(audio , window_size)  
    freqs = librosa.core.fft_frequencies(fs, window_size)
    barks = Bark(freqs)

    # compute the quiet threshold 
    ATH = np.zeros(len(barks)) - np.inf
    bark_ind = np.argmax(barks > 1)
    ATH[bark_ind:] = quiet(freqs[bark_ind:])

    # compute the global masking threshold theta_xs 
    theta_xs = []
    # compute the global masking threshold in each window
    for i in range(PSD.shape[1]):
        theta_xs.append(compute_th(PSD[:,i], barks, ATH, freqs))
    theta_xs = np.array(theta_xs)
    return win,z, PSD,theta_xs, psd_max,max_,phase,stft

This is meant to generate the adversarial audio using All frames 

In [None]:
import os
import wave
import librosa
import scipy.io.wavfile as wav
import numpy as np
 
def add_noise(audio_path):

  #audio_path='/content/gdrive/MyDrive/deepspeech/audios_wav/19-198-0001.wav'
  print(audio_path)
  scale = 8. / 3.
  window_size=2048
  frame_length = 2048
  frame_step = 512
  fs, data_true = wav.read(audio_path)
  length=len(data_true)
  data_true=data_true.reshape((length,)).astype(np.float64)
  win_ori,z_ori,PSD, theta_xs, psd_max, max_,phase,stft=generate_th(data_true, fs)
  print(length)
  print("PSD",PSD.shape)
  print(win_ori.shape)
  print("THETA_XS",theta_xs.shape)

  theta_xs_=theta_xs.transpose()
  a=theta_xs_*psd_max.reshape([-1, 1, 1])
  b=pow(10., 9.6)
  psd=a/b
  z_=np.sqrt(psd)/scale*2048.0
  print(z_.shape)
  z=z_.reshape((z_.shape[1],z_.shape[2]))
  #print(z)
  #y_inv = librosa.griffinlim(z)

  istft=z*phase
  y_inv=librosa.core.istft(istft)
  data_pad=copy.deepcopy(data_true)
  #print(data_pad)
  print(y_inv)
  for i in range(len(y_inv)):
    data_pad[i]+=y_inv[i]

  data_pad=data_pad.astype(np.int16)
  return data_pad
  #wavfile.write("/content/gdrive/MyDrive/deepspeech/audios_all_wav/19-198-0001.wav", fs, data_pad)


name="sample.wav"
path_noise="OP_all.wav"

print("add noise File Name is ", name)
data_noise = add_noise(name)

wav.write(path_noise,fs, data_noise)




This is meant to generate the adversarial audio using Important frames. But you need to generate all frames adversarial sample and important frames list first by using frameselection.ipynb.

In [None]:
import numpy as np  
import numpy as np
# save np.load
np_load_old = np.load

# modify the default parameters of np.load
np.load = lambda *a,**k: np_load_old(*a, allow_pickle=True, **k)

path_index="index.npy"
# call load_data with allow_pickle implicitly set to true
index_list = np.load(path_index) 

# restore np.load for future normal usage
np.load = np_load_old
#error_list = np.load("/content/gdrive/MyDrive/deepspeech/error_all.npy") 
#print(error_list)  

indexlist=index_list.tolist()

In [None]:
from scipy.io import wavfile
import numpy as np
from scipy.fftpack import fft
from scipy.fftpack import ifft
from scipy import signal
import scipy
import copy
import os

url_already_audio=path_noise
#path_noise is the path of the adversarial sample using GL and ALL 
url_original_audio=name

url = "Important_OP.wav"


original_url=url_original_audio
all_url=url_already_audio
fs, data_true = wavfile.read(original_url)
fs, data_all = wavfile.read(all_url)
data_final=copy.deepcopy(data_true)
data_n=(len(data_true)-1536)//512

n=1
index_=indexlist
num_frames=n*len(index_)
#index_random=int_random(0,data_n,num_frames)
#print(index_random)
#print(data_true[0*n*512:1*n*512])

for item in index_:
  
  data_final[item*512*n:(item+1)*512*n]=copy.deepcopy(data_all[item*512*n:(item+1)*512*n])

wavfile.write(url, fs, data_final)
print("============================================================================")

This is meant to generate the adversarial audio using Random frames. But you need to generate all frames adversarial sample and important frames list first by using frameselection.ipynb.

In [None]:
import numpy as np  
import numpy as np
# save np.load
np_load_old = np.load

# modify the default parameters of np.load
np.load = lambda *a,**k: np_load_old(*a, allow_pickle=True, **k)

path_index="index.npy"
# call load_data with allow_pickle implicitly set to true
index_list = np.load(path_index) 

# restore np.load for future normal usage
np.load = np_load_old
#error_list = np.load("/content/gdrive/MyDrive/deepspeech/error_all.npy") 
#print(error_list)  

indexlist=index_list.tolist()

In [None]:
from scipy.io import wavfile
import numpy as np
from scipy.fftpack import fft
from scipy.fftpack import ifft
from scipy import signal
import scipy
import copy
import os

url_already_audio=path_noise
url_original_audio=name

url = "Random_OP.wav"


original_url=url_original_audio
all_url=url_already_audio
fs, data_true = wavfile.read(original_url)
fs, data_all = wavfile.read(all_url)
data_final=copy.deepcopy(data_true)
data_n=(len(data_true)-1536)//512

index_all=[i for i in range(data_n)]
#index_all=range(0,data_n)
n=1
index_=indexlist
num_frames=n*len(index_)
for i in index_:
  for j in range(n):
    index_all.remove(i*n+j)
  #del index_all[i*n:i*(n+1)]
if(len(index_all)>=num_frames):
  index_random=random.sample(index_all,num_frames)
else:
  index_random=index_all
#index_random=int_random(0,data_n,num_frames)
#print(data_true[0*n*512:1*n*512])

for item in index_random:
  
  data_final[item*512:(item+1)*512]=copy.deepcopy(data_all[item*512:(item+1)*512])

wavfile.write(url, fs, data_final)
print("============================================================================")