# TP4 : Reconnaissance de mots isolés par programmation dynamique (DTW)

NOM - PRÉNOM :
Déposer le notebook sur moodle à la fin de la séance.

In [None]:
import numpy as np
from math import floor, inf, dist
import scipy.io.wavfile as wav
import os
from os.path import isfile, join
import matplotlib.pyplot as plt

# Réutilisation de fonctions pour la lecture et la paramétrisation

In [None]:
#!unzip SIGNAL.zip

In [None]:
# Utiliser la fonction "lecture" permettant de lire un fichier son et de connaître ses caractéristiques
# (fréquence d’échantillonnage, durée).

# LECTURE d'un fichier Wave
def lecture(fichier, nb_bits):

    # Charger un fichier audio
    fs, sig = wav.read(fichier)

    # Normaliser le signal entre -1 et 1 pour éviter les dépassements dans les calculs
    signal = sig / 2 ** (nb_bits - 1)

    # Durée du signal (en secondes)
    duree = len(signal) / fs

    return signal, fs, duree


In [None]:
# Utiliser la fonction "parametrisation" qui calcule les paramètres acoustiques
# (coefficients cepstraux) d’un fichier sonore sur des fenêtres de taille "taille_fenetre" avec un
# recouvrement de moitié .

# Calcul de "nbe_coef" coefficients cepstraux
def parametrisation(signal, taille_fenetre, nbe_coef):

    # Initialisation de la matrice résultat
    recouvrement = floor(taille_fenetre / 2)
    nb_fen = floor((np.size(signal) - taille_fenetre) / recouvrement) + 1
    mfcc = np.zeros((nb_fen, nbe_coef))

    # Calcul des MFCC
    for fen in range(nb_fen):
        p = fen * recouvrement
        spectre = abs(np.fft.fft(np.multiply(signal[p:p + taille_fenetre], np.hamming(taille_fenetre))))
        cepstre = np.fft.fft(np.log(spectre))
        cc = cepstre[1:nbe_coef+1].real
        mfcc[fen, :] = cc

    return mfcc


In [None]:
# Ecrire votre fonction "parametrisation_total" (utilisant la fonction "parametrisation")
# afin qu’elle permette de calculer les paramètres pour l’ensemble des fichiers d’un répertoire
# "rep_wav" et de stocker les résultats dans un répertoire "rep_mfcc". Cette fonction renvoie
# la liste des noms de fichiers traités.

# Fonction de paramétrisation totale --> tous les fichiers du répertoire 'WAV/APP'
def parametrisation_total(nb_bits, taille_fenetre, nbe_coef, rep_wav, rep_mfcc):
    isExist = os.path.exists(rep_mfcc)
    if not isExist:
        os.makedirs(rep_mfcc)

    for filename in os.listdir(rep_wav):
        f = os.path.join(rep_wav, filename)
        if os.path.isfile(f):
            print(f)
            signal, fs, duree = lecture(f, nb_bits)

            mfcc_f = parametrisation(signal, taille_fenetre, nbe_coef)
            mfcc_name = rep_mfcc + '/' + os.path.basename(filename)
            mfcc_name = os.path.splitext(mfcc_name)[0]+'.mfcc'
            np.savetxt(mfcc_name, mfcc_f)
    print("MFCC:", mfcc_f.shape)

    return

In [None]:
# Initialisation
q = 16
taille_fenetre = 1024
nbe_coef = 16
rep_ref = 'SIGNAL/REF'
rep_mfcc = 'MFCC'

parametrisation_total(q, taille_fenetre, nbe_coef, rep_ref, rep_mfcc)

SIGNAL/REF/bravo.wav
SIGNAL/REF/delta.wav
SIGNAL/REF/hotel.wav
SIGNAL/REF/echo.wav
SIGNAL/REF/juliett.wav
SIGNAL/REF/foxtrott.wav
SIGNAL/REF/charlie.wav
SIGNAL/REF/india.wav
SIGNAL/REF/alpha.wav
SIGNAL/REF/kilo.wav
SIGNAL/REF/lima.wav
SIGNAL/REF/mike.wav
SIGNAL/REF/golf.wav
MFCC: (25, 16)


# Programmation dynamique

In [None]:
# Ecrire une fonction "dtw" qui prend deux arguments en entrée : la matrice de coefficients
# cepstraux du signal à reconnaître (observation) et la matrice de coefficients cepstraux d'un
# signal de référence. Cette fonction renvoie le coût normalisé.

# Calcul de la DTW entre deux vecteurs

def dtw(mfcc_ref, mfcc_obs):
    g = np.matrix(np.ones((mfcc_obs.shape[0], mfcc_ref.shape[0])) * np.inf)
    g[0,0] = 0

    for i in range(1, mfcc_obs.shape[0]):
        for j in range (1, mfcc_ref.shape[0]):
            dist_ij = np.linalg.norm(mfcc_obs[i,:] - mfcc_ref[j,:])

            g[i,j] = min(g[i-1, j] + dist_ij,
                            g[i-1, j-1] + 2 * dist_ij,
                            g[i, j-1] + dist_ij)

    return g[-1, -1] / (mfcc_obs.shape[0] + mfcc_ref.shape[0])


In [None]:
#import shutil

#shutil.rmtree('MFCC')

ATTENTION IL FAUT GENERER LE MFCC POUR LES REF AVANT MAIS PAS CELUI DES OBS

In [None]:
#Ecrire une fonction "dtw_total" (utilisant la fonction "dtw") qui calcule la DTW sur toutes
# les observations (fichiers Wave) du répertoire "rep_obs" par rapport à toutes les références
# (fichiers MFCC) du répertoire "rep_ref". Cette fonction renvoie une matrice de coûts de taille
# nb_fichier_obs x nb_fichier_ref.

# DTW sur toutes les observations par rapport à toutes les références
def DTW_total(nb_bits, taille_fenetre, nbe_coef, rep_mfcc_ref, rep_wave_obs):
    scores = []
    for filename in sorted(os.listdir(rep_wave_obs)):
        mfccs = []
        f_obs = os.path.join(rep_wave_obs, filename)
        if os.path.isfile(f_obs):
            print("Lecture : ",f_obs)
            signal, fs, duree = lecture(f_obs, nb_bits)

            mfcc_obs = parametrisation(signal, taille_fenetre, nbe_coef)

            for filename in sorted(os.listdir(rep_mfcc)):
                f_ref = os.path.join(rep_mfcc, filename)
                if os.path.isfile(f_ref):
                    print("fref", f_ref)
                    mfcc_ref = np.loadtxt(f_ref)

                    dtw_curr = dtw(mfcc_ref, mfcc_obs)
                    mfccs.append(dtw_curr)
            scores.append(mfccs.copy())

    return np.array(scores)

# Affichages et tests

In [None]:
# Ecrire un programme principal qui lance les fonctions précédentes et affiche pour chaque
# observation (mot inconnu), le mot le plus probable.

# Initialisation
q = 16
taille_fenetre = 1024
nbe_coef = 16
rep_ref = 'SIGNAL/REF'
rep_obs = 'SIGNAL/OBS'
rep_mfcc = 'MFCC'


In [None]:
# Paramétrisation des fichiers références
import shutil

shutil.rmtree('MFCC')
parametrisation_total(q, taille_fenetre, nbe_coef, rep_ref, rep_mfcc)


# Test de la DTW
signal, fs, duree = lecture("SIGNAL/OBS/test3.wav", q)

mfcc_obs = parametrisation(signal, taille_fenetre, nbe_coef)

signal, fs, duree = lecture("SIGNAL/REF/bravo.wav", q)

mfcc_ref = parametrisation(signal, taille_fenetre, nbe_coef)

print(dtw(mfcc_obs=mfcc_obs, mfcc_ref=mfcc_ref))


SIGNAL/REF/bravo.wav
SIGNAL/REF/delta.wav
SIGNAL/REF/hotel.wav
SIGNAL/REF/echo.wav
SIGNAL/REF/juliett.wav
SIGNAL/REF/foxtrott.wav
SIGNAL/REF/charlie.wav
SIGNAL/REF/india.wav
SIGNAL/REF/alpha.wav
SIGNAL/REF/kilo.wav
SIGNAL/REF/lima.wav
SIGNAL/REF/mike.wav
SIGNAL/REF/golf.wav
MFCC: (25, 16)
521.8583766298043


In [None]:
# DTW sur toutes les observations par rapport à chaque référence

# Affichage des couts
mat_dtw = DTW_total(q, taille_fenetre, nbe_coef, rep_mfcc, rep_obs)

Lecture :  SIGNAL/OBS/alpha.wav
['alpha.mfcc', 'bravo.mfcc', 'charlie.mfcc', 'delta.mfcc', 'echo.mfcc', 'foxtrott.mfcc', 'golf.mfcc', 'hotel.mfcc', 'india.mfcc', 'juliett.mfcc', 'kilo.mfcc', 'lima.mfcc', 'mike.mfcc']
fref MFCC/alpha.mfcc
fref MFCC/bravo.mfcc
fref MFCC/charlie.mfcc
fref MFCC/delta.mfcc
fref MFCC/echo.mfcc
fref MFCC/foxtrott.mfcc
fref MFCC/golf.mfcc
fref MFCC/hotel.mfcc
fref MFCC/india.mfcc
fref MFCC/juliett.mfcc
fref MFCC/kilo.mfcc
fref MFCC/lima.mfcc
fref MFCC/mike.mfcc
Lecture :  SIGNAL/OBS/bravo.wav
['alpha.mfcc', 'bravo.mfcc', 'charlie.mfcc', 'delta.mfcc', 'echo.mfcc', 'foxtrott.mfcc', 'golf.mfcc', 'hotel.mfcc', 'india.mfcc', 'juliett.mfcc', 'kilo.mfcc', 'lima.mfcc', 'mike.mfcc']
fref MFCC/alpha.mfcc
fref MFCC/bravo.mfcc
fref MFCC/charlie.mfcc
fref MFCC/delta.mfcc
fref MFCC/echo.mfcc
fref MFCC/foxtrott.mfcc
fref MFCC/golf.mfcc
fref MFCC/hotel.mfcc
fref MFCC/india.mfcc
fref MFCC/juliett.mfcc
fref MFCC/kilo.mfcc
fref MFCC/lima.mfcc
fref MFCC/mike.mfcc
Lecture :  SIGNA

Pour l'eval entre un et deux fichiers marchent tjrs !

In [None]:
# Affichage score final
def display_res(dtw_matrix):
    for i, preds in enumerate(dtw_matrix):
        min_pred = np.min(preds)
        nb_min = np.argmin(preds)

        print("Observation ", i, "=======> ", nb_min, "Score : ", min_pred)

In [None]:
display_res(mat_dtw)

