Aujourd'hui, nous allons classer notre ensemble de données en utilisant le Deep Learning.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import random
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
import glob
from scipy.io import wavfile
from tqdm  import tqdm
import pickle
from IPython.display import Audio
import matplotlib.pyplot as plt

dataset_path = "/content/drive/My Drive/IA_MIAGE/google-speech-dataset/"
all_classes = os.listdir(dataset_path)
all_classes.remove('.git')
all_classes.remove('bird') #some outliers in dataset
all_classes.remove('_lab_ressources_')
all_classes.remove('_background_noise_')
chosen_classes = ["three", "tree"]

In [None]:
X_train_waveform = pickle.load(open("/content/drive/My Drive/IA_MIAGE/X_train_waveform.pkl", "rb"))
X_test_waveform = pickle.load(open("/content/drive/My Drive/IA_MIAGE/X_test_waveform.pkl", "rb"))
X_val_waveform = pickle.load(open("/content/drive/My Drive/IA_MIAGE/X_val_waveform.pkl", "rb"))

y_train =  pickle.load(open("/content/drive/My Drive/IA_MIAGE/y_train.pkl", "rb"))
y_test =  pickle.load(open("/content/drive/My Drive/IA_MIAGE/y_test.pkl", "rb"))
y_val =  pickle.load(open("/content/drive/My Drive/IA_MIAGE/y_val.pkl", "rb"))

In [None]:
print(X_train_waveform.shape, y_train.shape)
print(X_val_waveform.shape, y_val.shape)
print(X_test_waveform.shape, y_val.shape)

In [None]:
!pip install sonopy
from sonopy import mfcc_spec

In [None]:
def get_mfcc_from_signal(signal):

  #extract MFCCs features
  single_mfcc = mfcc_spec(signal, 16000, window_stride=(400, 160), fft_size=512, num_filt=20, num_coeffs=13).T

  #dropping first coefficient
  single_mfcc = single_mfcc[1:,:] #keeping only 12 coefficients

  return single_mfcc

In [None]:
X_train_mfcc = [get_mfcc_from_signal(signal) for signal in tqdm(X_train_waveform,position=0)]
X_val_mfcc = [get_mfcc_from_signal(signal) for signal in tqdm(X_val_waveform,position=0)]
X_test_mfcc = [get_mfcc_from_signal(signal) for signal in tqdm(X_test_waveform,position=0)]

In [None]:
X_train_mfcc = np.asarray(X_train_mfcc)
X_val_mfcc = np.asarray(X_val_mfcc)
X_test_mfcc = np.asarray(X_test_mfcc)

Nous commencerons par CNN (Convolutional Neural Networs). Cet algorithme est conçu pour classer les images. Nous utiliserons ensuite les caractéristiques MFCC de nos pistes comme images.

In [None]:
plt.imshow(X_train_mfcc[0], cmap='gray')
plt.show()

CNN attend comme entrée des tableaux de taille nombre_de_lignes, nombre_de_colonnes, nb_canaux. Nos images n'ont qu'un seul canal.

In [None]:
'''
Adding the channel dimension
'''

X_train_mfcc_expanded = np.expand_dims(X_train_mfcc, axis=-1)
print('train tensor shape:',X_train_mfcc_expanded.shape)

X_val_mfcc_expanded = np.expand_dims(X_val_mfcc, axis=-1)
print('validation tensor shape:',X_val_mfcc_expanded.shape)

X_test_mfcc_expanded = np.expand_dims(X_test_mfcc, axis=-1)
print('test tensor shape:',X_test_mfcc_expanded.shape)

nb_classes = len(chosen_classes)


In [None]:
import tensorflow as tf
import keras
nb_classes = 2
y_train = tf.keras.utils.to_categorical(y_train, 2)
y_test = tf.keras.utils.to_categorical(y_test, 2)
y_val = tf.keras.utils.to_categorical(y_val, 2)

print("Original class:", y_test[0], ", Class in one-hot encoding:", y_test[0])

Comme vous l'avez vu dans le TP précédent, le réseau doit connaître la taille de l'entrée. Nous allons créer ici une couche d'entrée
```
mfcc = Input(shape=(12,98,1))
```
Keras fournit également des méthodes pour créer des couches convolutives. 

```
conv_1 = Conv2D(filters=32, kernel_size=(8,15), strides=(1,1), activation= None, padding='same')
```
Nous n'entrerons pas dans les détails pour expliquer tous les paramètres d'une couche convolutive, mais pour l'instant il suffit de savoir que  

*   filters : Nombre de filtres à apprendre dans la couche
*   kernel_size: Taille des filtres

La couche *Flatten* convertit une matrice de n dimensions en un tableau de 1 dimension

Dans le prochain code, nous ajouterons une couche convolutive suivie de quelques couches *Dense* similaires à celles que nous avons vues dans le dernier TD




In [None]:
from keras.layers import Input, Conv2D, BatchNormalization, Activation, MaxPool2D, Flatten, Dropout, Dense
from tensorflow.keras.optimizers import Adam
from keras import Model

#We first define the input shape
mfcc = Input(shape=(12,98,1)) 

#We build the first convolutional layer and feed it the previously defined Input
conv_1 = Conv2D(filters=32, kernel_size=(8,15), strides=(1,1), padding='same', activation="relu", name="conv_1")(mfcc)

# Flattening a tensor means to remove all of the dimensions except for one. This is exactly what the Flatten layer do.
# A flatten operation on a tensor reshapes the tensor to have the shape that is equal to the number of elements contained in tensor non including the batch dimension.
# See summary bellow to see the change of the shape
reshape = Flatten(name="flatten")(conv_1)

# dense layer, same as the one used in MLP
dense_1 = Dense(128, activation = 'relu', name="dense_1")(reshape)

# dropout, sane as the dropout used in MLP
dropout = Dropout(rate = 0.2, name="dropout")(dense_1)

# dense layer, same as the one used in MLP
dense_2 = Dense(128, activation = 'relu', name="dense_2")(dropout)

# dense layer, same as the one used in MLP
output = Dense(2, activation = 'softmax', name="dense_3")(dense_2)

# The line bellow create the model composed by all the layers previously created
model = Model(inputs = mfcc, outputs = output)

Avec la ligne suivante, vous pouvez voir l'architecture de votre réseau

In [None]:
model.summary()

Le réseau a 4,837,666 paramètres à apprendre.

Une technique pour réduire la taille de l'image après la convolution consiste à utiliser une couche *Pooling*. Comme le montre le gif suivant, il divise l'image en grilles et extrait le maximum de chaque grille.

<img src="https://developers.google.com/machine-learning/practica/image-classification/images/maxpool_animation.gif" width="500"/>



In [None]:
mfcc = Input(shape=(12,98,1)) 

conv_1 = Conv2D(filters=32, kernel_size=(8,15), strides=(1,1), padding='same', activation="relu", name="conv_1")(mfcc)

max_pool = MaxPool2D(pool_size=(2, 2))(conv_1)

reshape = Flatten(name="flatten")(max_pool)

dense_1 = Dense(128, activation = 'relu', name="dense_1")(reshape)

dropout = Dropout(rate = 0.2, name="dropout")(dense_1)

output = Dense(2, activation = 'softmax', name="dense_2")(dropout)

model = Model(inputs = mfcc, outputs = output)

model.summary()

En ajoutant une couche MaxPool après la couche convolutionnelle, nous avons réduit la taille de l'image de (12, 98, 32) à (6, 49, 32) et le nombre de paramètres à apprendre de 4 837 666 à 1 224 994.


Maintenant, entraînons notre petit modèle

In [None]:
model.compile(loss='categorical_crossentropy',
              optimizer = Adam(lr=0.003),metrics=['accuracy'])

print("Training ...")

history_cnn = model.fit(X_train_mfcc_expanded,y_train,validation_data = (X_val_mfcc_expanded, y_val),batch_size=64, epochs=5, shuffle=True)

In [None]:
print("Testing ...")

loss, accuracy = model.evaluate(X_test_mfcc_expanded, y_test,verbose = 1)

print('model accuracy:',accuracy)

Vous pouvez également ajouter d'autres couches convolutives, l'une après l'autre. (N'oubliez pas que l'ajout de couches n'est pas toujours une bonne solution.)

In [None]:
mfcc = Input(shape=(12,98,1)) 

conv_1 = Conv2D(filters=32, kernel_size=(8,15), strides=(1,1), padding='same', activation="relu", name="conv_1")(mfcc)

max_pool = MaxPool2D(pool_size=(2, 2))(conv_1)

dropout_1 = Dropout(rate = 0.2, name="dropout_1")(max_pool)

conv_2 = Conv2D(filters=32, kernel_size=(8,15), strides=(1,1), padding='same', activation="relu", name="conv_2")(dropout_1)

reshape = Flatten(name="flatten")(conv_2)

dense_1 = Dense(128, activation = 'relu', name="dense_1")(reshape)

dropout_2 = Dropout(rate = 0.2, name="dropout_2")(dense_1)

output = Dense(2, activation = 'softmax', name="dense_2")(dropout_2)

model = Model(inputs = mfcc, outputs = output)

model.summary()

In [None]:
model.compile(loss='categorical_crossentropy',
              optimizer = Adam(lr=0.003),metrics=['accuracy'])

print("Training ...")

history_cnn = model.fit(X_train_mfcc_expanded,y_train,validation_data = (X_val_mfcc_expanded, y_val),batch_size=64, epochs=5, shuffle=True)

print("Testing ...")

loss, accuracy = model.evaluate(X_test_mfcc_expanded, y_test,verbose = 1)

print('model accuracy:',accuracy)

Créons un modèle plus complexe pour évaluer les autres techniques que nous avons vues pendant le cours.

In [None]:
def CNN():

  mfcc = Input(shape=(12,98,1)) 

  conv_1 = Conv2D(filters=32, kernel_size=(8,15), strides=(1,1), activation= None, padding='same')(mfcc)
  norm_1 = BatchNormalization()(conv_1)
  activ_1 = Activation('relu')(norm_1)

  dropout_1 = Dropout(rate=0.1)(activ_1)

  max_pool = MaxPool2D(pool_size=(2, 2))(dropout_1)

  conv_2 = Conv2D(filters=32, kernel_size=(4,10),strides=(1,1), activation= None,padding='same')(max_pool)
  norm_2 = BatchNormalization()(conv_2)
  activ_2 = Activation('relu')(norm_2)

  dropout_2 = Dropout(rate=0.1)(activ_2)

  reshape = Flatten()(dropout_2)

  dense_1 = Dense(128, activation = 'relu')(reshape)
  dropout_3 = Dropout(rate = 0.2)(dense_1)
  dense_2 = Dense(128, activation = 'relu')(dropout_3)
  output = Dense(2, activation = 'softmax')(dense_2)

  modell = Model(inputs = mfcc, outputs = output)

  return modell

model_cnn = CNN()

model_cnn.compile(loss='categorical_crossentropy',
              optimizer = Adam(lr=0.003),metrics=['accuracy'])

model_cnn.summary()

In [None]:
print("Training ...")
history_cnn = model_cnn.fit(X_train_mfcc_expanded,y_train,validation_data = (X_val_mfcc_expanded, y_val),batch_size=32, epochs=8, shuffle=True)

print("Testing ...")

loss, accuracy = model_cnn.evaluate(X_test_mfcc_expanded, y_test,verbose = 1)

print('model accuracy:',accuracy)

Avec la fonction suivante, vous pouvez tracer la précision de training et validation.

In [None]:
def plot_history(history):
  plt.plot(history.history['accuracy'])
  plt.plot(history.history['val_accuracy'])
  plt.title('model accuracy')
  plt.ylabel('accuracy')
  plt.xlabel('epoch')
  plt.legend(['train', 'val'], loc='upper left')
  plt.show()
  
plot_history(history_cnn)

LSTM est un algorithme d'apprentissage automatique très utile pour les séquences temporelles comme les échantillons audio.

In [None]:
from keras.models import Sequential
from keras.layers import Bidirectional, LSTM, Dropout

model_lstm = Sequential()
model_lstm.add(LSTM(32,input_shape=(98,12)))
model_lstm.add(Dropout(0.2))
model_lstm.add(Dense(2,activation='softmax'))

model_lstm.compile(optimizer="adam",loss='categorical_crossentropy',metrics=['accuracy'])

model_lstm.summary()

In [None]:
print("Training ...")
history_lstm = model_lstm.fit(np.transpose(X_train_mfcc, (0,2,1)),y_train,validation_data = (np.transpose(X_val_mfcc, (0,2,1)), y_val),batch_size=32, epochs=30, shuffle=True)

print("Testing ...")
loss, accuracy = model_lstm.evaluate(np.transpose(X_test_mfcc, (0,2,1)), y_test,verbose = 1)

print('model accuracy:',accuracy)

In [None]:
plot_history(history_lstm)

Nos résultats précédents sont assez bons ; cependant, ce que nous avons fait auparavant n'est que des exemples de jouets. Maintenant, nous allons compliquer le problème pour qu'il soit plus proche des cas d'utilisation réelle :

Nous allons ajouter du bruit à nos échantillons audio car nous voulons que notre modèle fonctionne dans un environnement potentiellement bruyant. Ce processus est appelé "augmentation des données" : dans le scénario du monde réel, nous pouvons avoir un ensemble de données audio prises dans un ensemble limité de conditions. Mais notre application cible peut exister dans une variété de conditions. Nous voulons donner au réseau la possibilité d'ignorer le bruit.

Nous allons maintenant mélanger un signal avec du bruit et comprendre le concept de SNR.

Dans le dataset que vous avez téléchargé dans le premier TD, il y a quelques échantillons de bruit.


In [None]:
def normalize(audio_signal):
  audio_signal = np.array(audio_signal) ##Audio signal as array, in case it's not
  max_value = max(np.absolute(audio_signal)) ## Get the maximum positive value
  norm_signal = audio_signal / max_value
  return norm_signal

def add_padding(audio_signal, desired_len):
  
  length_signal = len(audio_signal)
  if length_signal < desired_len:
    zeros = np.zeros(desired_len)
    start_signal = random.randint(0, desired_len - length_signal) ## Bonne Question: Pourquoi aléatoire?
    zeros[start_signal: start_signal + length_signal] = audio_signal
    return zeros
    
  return audio_signal

def get_signal(signal_path):
  
  rate, sig = wavfile.read(filename=signal_path)
  
  try:
    sig = sig[:, 0]
  except:
    pass
  
  #normalization
  sig = normalize(sig)

  #standardization of sizes
  sig = add_padding(sig,16000)
  
  return sig

In [None]:
noise_path = glob.glob(dataset_path + '_background_noise_/*wav')

noise_lib = [get_signal(p) for p in tqdm(noise_path, position=0)]

Voyons le graphique et écoutons le son de certains des signaux de bruit.

In [None]:
noise = random.choice(noise_path)
noise_wav = get_signal(noise)

plt.figure()
n_name = noise.split('/')[-1].split('.wav')[0]
cut = random.randint(0, len(noise_wav))
plt.plot(noise_wav[cut:cut+16000])
plt.title(f"Noise waveform: {n_name}")
plt.show()

Audio(noise_wav, rate=16000)

**SNR** (Signal to Noise Ratio) ou Rapport signal sur bruit

À ce stade, avant le mixage, nous avons besoin d'un moyen de mesurer la quantité de bruit à ajouter au signal original. Nous voulons contrôler l'intensité sonore et le signal bruyant pour comprendre comment les performances de la classification évoluent pour différents niveaux de bruit. 

La mesure utilisée pour contrôler le niveau de bruit à ajouter ou à retirer du signal est le **(SNR)**. Il s'agit d'une mesure utilisée en science et en ingénierie qui compare le niveau d'un signal souhaité au niveau du bruit de fond. Le SNR est défini comme le rapport entre la puissance du signal et la puissance du bruit, souvent exprimé en décibels. Un rapport supérieur à 1:1 (supérieur à 0 dB) indique qu'il y a plus de signal que de bruit. Le rapport signal/bruit est défini comme le rapport entre la puissance d'un signal (information significative) et la puissance du bruit de fond (signal indésirable) :

$SNR = \frac{P_{signal}}{S_{noise}}$

In [None]:
## Fonctions permettant d'ajouter du bruit aux signaux originaux

def cal_adjusted_rms(clean_rms, snr):
    a = float(snr) / 20
    noise_rms = clean_rms / (10 ** a)
    return noise_rms
  

def cal_rms(amp):
    return np.sqrt(np.mean(np.square(amp), axis=-1))

def add_noise(signal, noise, snr):
    '''
    signal: np.ndarray
    snr: signed integer

    returns -> np.ndarray
    '''

    index = random.randint(0, len(noise) - len(signal))
    noise = noise[index:index+len(signal)]

    clean_rms = cal_rms(signal)

    # take part of noise and:
    noise_rms = cal_rms(noise)

    #noise amplitude
    adjusted_noise_rms = cal_adjusted_rms(clean_rms, snr)

    adjusted_noise_amp = noise * (adjusted_noise_rms / noise_rms)
    mixed_amp = (signal + adjusted_noise_amp)

    max_int16 = np.iinfo(np.int16).max
    if mixed_amp.max(axis=0) > max_int16:
        reduction_rate = max_int16 / mixed_amp.max(axis=0)
        mixed_amp = mixed_amp * (reduction_rate)

    return mixed_amp/ max(np.absolute(mixed_amp))

Essayez maintenant de mélanger les deux sons à des niveaux de SNR différents en changeant la valeur **snr** ci-dessous. N'oubliez pas que plus cette valeur est basse, plus le mélange sera bruyant. Vous pouvez également jouer le son pour avoir un retour direct. (N'oubliez pas de ré-exécuter la cellule à chaque fois que vous changez le snr.)

In [None]:
snr = -5 ## A completer. 
signal = X_train_waveform[0]
mixed = add_noise(signal, noise_wav, snr)

#waveform noise
plt.figure()
plt.subplot(221)
plt.plot(signal)
plt.title(f"Signal")
plt.subplot(222)
plt.plot(mixed)
plt.title(f"Signal and noise mixed at {snr} SNR")
plt.show()
mixed = (mixed*32767).astype(np.int16) 
wavfile.write(dataset_path + '/mixed.wav', 16000, mixed)
Audio(dataset_path + '/mixed.wav')

En gardant cela à l'esprit, nous pouvons augmenter notre ensemble de données et essayer de rendre notre modèle plus robuste au bruit. Pour ce faire, nous sélectionnons au hasard un fichier de bruit dans notre ensemble de données et le mélangeons à l'un des fichiers de l'ensemble de données à un niveau aléatoire de SNR compris entre -5 et +20. 

Cela signifie que notre ensemble de données augmenté aura deux fois plus de fichiers audio que la version propre.


In [None]:
snr = np.arange(-5,+20)
def augment_data(signal, snr_range=snr):

  noise = random.choice(noise_lib)
  snr = random.choice(snr_range)
    
  mixed = add_noise(signal, noise, snr)
    
  return mixed

In [None]:
'''
adding noise
'''
X_train_aug = [augment_data(signal) for signal in tqdm(X_train_waveform,position=0)]
X_test_aug = [augment_data(signal) for signal in tqdm(X_test_waveform,position=0)]
X_val_aug = [augment_data(signal) for signal in tqdm(X_val_waveform,position=0)]

'''
'get mfcc features'
'''
X_train_noisy_mfcc = np.asarray([get_mfcc_from_signal(signal) for signal in tqdm(X_train_aug,position=0)])
X_test_noisy_mfcc = np.asarray([get_mfcc_from_signal(signal) for signal in tqdm(X_test_aug,position=0)])
X_val_noisy_mfcc = np.asarray([get_mfcc_from_signal(signal) for signal in tqdm(X_val_aug,position=0)])

'''
get augmented dataset with clean and noisy data
'''
X_train_noisy = np.concatenate((X_train_noisy_mfcc, X_train_mfcc), axis=0)
X_test_noisy = np.concatenate((X_test_noisy_mfcc, X_test_mfcc), axis=0)
X_val_noisy = np.concatenate((X_val_noisy_mfcc, X_val_mfcc), axis=0)


print('X_train_noisy:',len(X_train_noisy))
print('X_val_noisy:',len(X_val_noisy))
print('X_test_noisy:',len(X_test_noisy))


'''
labels
'''
y_train_noisy = np.array(list(y_train) + list(y_train))
y_val_noisy = np.array(list(y_val) + list(y_val))


print("y_train_noisy:",len(y_train_noisy))
print("y_val_noisy:",len(y_val_noisy))

La fonction suivante vous permet d'évaluer un modèle dans une gamme de valeurs snr.

In [None]:
def accuracy_by_SNR(MODEL, TEST_WAVEFORMS = X_test_waveform, LABELS = y_test,MIN_SNR=-15,MAX_SNR=30,SNR_STEP=5):
  
  SNRs = list(range(MIN_SNR,MAX_SNR+1,)[0::SNR_STEP])
  
  ###
  acc = []
  for SNR in SNRs:
    
    X_noisy_SNR = []
    
    for SIGNAL in TEST_WAVEFORMS:
      
      NOISE = random.choice(noise_lib)
      X_noisy_SNR.append(add_noise(SIGNAL, NOISE, SNR))
    
    X_noisy_SNR_mfcc = [get_mfcc_from_signal(signal) for signal in tqdm(X_noisy_SNR,position=0)]
    X_noisy_SNR_mfcc_expanded = np.expand_dims(X_noisy_SNR_mfcc, axis=-1)  
    
  
    loss, accuracy = MODEL.evaluate(X_noisy_SNR_mfcc_expanded, y_test,verbose=0)
    
    print('SNR:',SNR," - acc: ", accuracy)
    acc.append(accuracy)
    
  return acc, SNRs  

Rappelez-vous que nous avons entraîné un modèle CNN avec notre ensemble de données (contenant uniquement les échantillons propres). Évaluons ce modèle avec ces nouveaux échantillons bruyants. 

In [None]:
acc_without_DA, snrs = accuracy_by_SNR(model_cnn)

plt.figure(figsize=(10,7))
plt.plot(snrs,acc_without_DA,color='blue')
plt.title("Accuracy according to the SNR")
plt.xlabel("SNR")
plt.ylabel("Accuracy")

plt.show()

Maintenant, entraînons dès le début une nouvelle CNN avec la même structure que celle vue précédemment, mais en utilisant les données augmentées. 

In [None]:
model_cnn_noisy = CNN()
model_cnn_noisy.compile(loss='categorical_crossentropy',
              optimizer = Adam(lr=0.003),metrics=['accuracy'])

In [None]:
X_train_noisy_expanded = np.expand_dims(X_train_noisy, axis=-1)
X_val_noisy_expanded = np.expand_dims(X_val_noisy, axis=-1)

print("Training ...")
history_cnn_noisy = model_cnn_noisy.fit(X_train_noisy_expanded,y_train_noisy,validation_data = (X_val_noisy_expanded, y_val_noisy),batch_size=64, epochs=10, shuffle=True)

In [None]:
acc_with_DA, snrs = accuracy_by_SNR(model_cnn_noisy)

plt.figure(figsize=(10,7))
plt.plot(snrs,acc_without_DA)
plt.plot(snrs,acc_with_DA)
plt.title("Accuracy according to the SNR")
plt.xlabel("SNR")
plt.ylabel("Accuracy")
plt.legend(['without data augmentation', 'with data augmentation'], loc='lower right')

plt.show()

Vous pouvez faire le même test avec le modèle LSTM....