#    Notebook para procesar archivos de voz v0080101

In [None]:
#Para descargar videos de Youtube
!pip install pytube

#Para procesamiento de audio, convertir formatos de archivos de audio en otros formatos, convertir tasas de muestreo, para aplicar efectos de sonido, reproducir y grabar archivos de audio.
!apt -qq install -y sox
!apt -qq install -y sox libsox-fmt-mp3
!pip install sox
!git clone https://github.com/rabitt/pysox.git
!cd /content/pysox
!python /content/pysox/setup.py install
!pip install git+https://github.com/rabitt/pysox.git

#convertidor de audio y video. Se usa para convertir WAV a MP3 y viceversa.
!pip3 install ffmpeg
!apt -qq install -y ffmpeg

#Para reproducir, dividir, integrar o editar los archivos de audio únicamente con extensión .wav
!pip3 install pydub


#Para extraer datos de una hoja de cálculo, solo para archivos de extensión .xls
!pip3 install xlrd
!pip3 install --upgrade xlrd

!pip install xlrd
!pip3 install xlrd
!pip install --upgrade --force-reinstall xlrd

#Para manejar y analizar estructuras de datos.
!pip3 install pandas
!pip3 install --upgrade pandas


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import seaborn as sns
import shutil
import tensorflow as tf
import tensorflow_hub as hub


from IPython.display import display, Audio
from pathlib import Path
from scipy import stats
from scipy.io import wavfile
from sklearn.metrics import confusion_matrix
from tensorflow import keras
from google.colab import drive
drive.mount('/content/drive/')


In [None]:
# Variables de sesión
#DATASET_ROOT = "/home/jmmiguez/proyectoAudio/audio/"
DATASET_ROOT = '/content/drive/MyDrive/proyectoAudio/audio'

# Las carpetas en las cuales voy a poner los ejemplos de audio y los ejemplos de ruidos
AUDIO_SUBFOLDER = "audio"
NOISE_SUBFOLDER = "noise"
DATASET_AUDIO_PATH = os.path.join(DATASET_ROOT, AUDIO_SUBFOLDER)
DATASET_NOISE_PATH = os.path.join(DATASET_ROOT, NOISE_SUBFOLDER)

print ("DATASET_ROOT {}".format(DATASET_ROOT))
print ("DATASET_AUDIO_PATH {}".format(DATASET_AUDIO_PATH))
print ("DATASET_NOISE_PATH {}".format(DATASET_NOISE_PATH))

# Porcentaje de muestras que voy a usar para validación
VALID_SPLIT = 0.2

# Semilla que voy a usar para mezclar los datos con el ruido
SHUFFLE_SEED = 43

# La tasa de muestreo a usar es única para todas las muestras de audio.
# Se vuelve a muestrear todo el ruido a esta frecuencia de muestreo.
# Este también será el tamaño de salida de las muestras de las señales de audio.
# (ya que todas las muestras son de 1 segundo de duración)
SAMPLING_RATE = 16000

# El factor a multiplicar por el ruido, es acorde a:
#   muestra_de_ruido = muestreo + ruido * prop * escala
#      donde prop = amplitud_de_muestreo / amplitud_de_ruido
SCALE = 0.5

BATCH_SIZE = 150
#EPOCHS = 100
EPOCHS = 50

In [None]:
# Si la carpeta 'audio' no existe, la creo; caso contrario, no hago nada.
if os.path.exists(DATASET_AUDIO_PATH) is False:
    os.makedirs(DATASET_AUDIO_PATH)

# si la carpeta 'noise' no existe, la creo; caso contrario, no hago nada.
if os.path.exists(DATASET_NOISE_PATH) is False:
    os.makedirs(DATASET_NOISE_PATH)

for folder in os.listdir(DATASET_ROOT):
    if os.path.isdir(os.path.join(DATASET_ROOT, folder)):
        
        print ("folder = {}".format(folder))
        
        if folder in [AUDIO_SUBFOLDER, NOISE_SUBFOLDER]:
            # Si la carpeta es 'audio' o 'noise', no hago nada
            continue
        elif folder in ["other", "_background_noise_"]:
            # Si la sub-carpeta es una de las que contienen muestras de ruido,
            # moverla a la carpeta 'noise'
            shutil.move(
                os.path.join(DATASET_ROOT, folder),
                os.path.join(DATASET_NOISE_PATH, folder),
            )
        else:
            # De otra forma, debe ser una carpeta de un hablante, asique hay que moverla a la carpeta de 'audio'
            shutil.move(
                os.path.join(DATASET_ROOT, folder),
                os.path.join(DATASET_AUDIO_PATH, folder),
            )

In [None]:
# Obtengo la lista de todos los archivos de ruido
noise_paths = []
for subdir in os.listdir(DATASET_NOISE_PATH):
    subdir_path = Path(DATASET_NOISE_PATH) / subdir

    print("subdir_path= {}".format(subdir_path))

    if os.path.isdir(subdir_path):
        noise_paths += [
            os.path.join(subdir_path, filepath)
            for filepath in os.listdir(subdir_path)
            if filepath.endswith(".wav")
        ]

print(
    "En el directorio {}, se encontró {} archivos, pertenecientes a {} directorios".format(
        DATASET_NOISE_PATH, len(noise_paths), len(os.listdir(DATASET_NOISE_PATH))
    )
)

print(noise_paths)

In [None]:
command = (
    "for dir in `ls -1 " + DATASET_NOISE_PATH + "`; do "
    "for file in `ls -1 " + DATASET_NOISE_PATH + "/$dir/*.wav`; do "
    "sample_rate=`ffprobe -hide_banner -loglevel panic -show_streams "
    "$file | grep sample_rate | cut -f2 -d=`; "
    "if [ $sample_rate -ne 16000 ]; then "
    "ffmpeg -hide_banner -loglevel panic -y "
    "-i $file -ar 16000 temp.wav; "
    "mv temp.wav $file; "
    "fi; done; done"
)
os.system(command)

# Dividir el ruido en fragmentos de 16000 Hz cada uno
def load_noise_sample(path):
    
    print(path)
    
    sample, sampling_rate = tf.audio.decode_wav(
        tf.io.read_file(path), desired_channels=1
    )
    if sampling_rate == SAMPLING_RATE:
        # Número de cortes a 16000 cada uno, que se pueden generar a partir de la muestra de ruido
        slices = int(sample.shape[0] / SAMPLING_RATE)
        sample = tf.split(sample[: slices * SAMPLING_RATE], slices)
        return sample
    else:
        print("La tasa de muestrea para {} es incorrecta. Se ignora".format(path))
        return None


noises = []
for path in noise_paths:
    sample = load_noise_sample(path)
    if sample:
        noises.extend(sample)
noises = tf.stack(noises)

print( "format(len(noise_paths)=           {}".format(len(noise_paths)) )
print( "noises.shape[0]=                   {}".format(noises.shape[0]) )
print( "noises.shape[1]=                   {}".format(noises.shape[1]) )
print( "noises.shape[1] // SAMPLING_RATE = {}".format(noises.shape[1] // SAMPLING_RATE) )

print(
    "{} archivos de ruido fueron divididos en {} muestras de ruido, donde cada una tiene {} seg. de tiempo.".format(
        len(noise_paths),noises.shape[0],noises.shape[1] // SAMPLING_RATE)
)

In [None]:
# Funciones para procesar Audio

def paths_and_labels_to_dataset(audio_paths, labels):
    """Construir un dataset de audios y etiquetas."""
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
    audio_ds = path_ds.map(lambda x: path_to_audio(x))
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((audio_ds, label_ds))


def path_to_audio(path):
    """Leer y decodificar un archivo de audio."""
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1, SAMPLING_RATE)
    return audio


def add_noise(audio, noises=None, scale=0.5):
    if noises is not None:
        # Crear un tensor aleatorio del mismo tamaño que el audio que va 
        # desde 0 hasta la cantidad de muestras de flujo de ruido que tenemos.
        tf_rnd = tf.random.uniform(
            (tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32
        )
        noise = tf.gather(noises, tf_rnd, axis=0)

        # Obtener la proporción de amplitud entre el audio y el ruido.
        prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1)
        prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1)

        # Agregar el ruido reescalado al audio
        audio = audio + noise * prop * scale

    return audio


def audio_to_fft(audio):
    # Dado que tf.signal.fft aplica FFT en la dimensión más interna, 
    # debemos comprimir las dimensiones y luego expandirlas nuevamente 
    # después de FFT
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
    )
    fft = tf.expand_dims(fft, axis=-1)

    # Devuelve el valor absoluto de la primera mitad de la FFT 
    # que representa las frecuencias positivas
    return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])


In [None]:
# Obtener las clases y sus etiquetas

class_names = os.listdir(DATASET_AUDIO_PATH)
print("Posibles nombres de nuestras Clases: {}".format(class_names,))
audio_paths = []
labels = []
class_names_process = []

#Reemplazo la recorrida original de archivos de audio, por los nombres de hablantes que están en un excel.
#df = pd.read_excel("/home/jmmiguez/proyectoAudio/audio/DiputadosDebatenElAcuerdoConElFMI3.xls")
#df = pd.read_excel("/home/jmmiguez/proyectoAudio/audio/prueba_004.xls")
df = pd.read_excel("/content/drive/MyDrive/proyectoAudio/audio/prueba_011.xls")

count = df.shape[0]

posicion = 0

for label, name in enumerate(class_names):
#    print("Name={}- Label={}".format(name,label,))

    for index, row in df.iterrows():        
        #print("==>son iguales nombre={} y nombre={} +---- index{}".format(row[2],name,index))

        if (row[2]==name):
            print("Procesando al hablante {}".format(row[2],))
            dir_path = Path(DATASET_AUDIO_PATH) / row[2]    
            speaker_sample_paths = [
                os.path.join(dir_path, filepath)
                for filepath in os.listdir(dir_path)
                if filepath.endswith(".wav")
            ]
            audio_paths += speaker_sample_paths
#            labels += [label] * len(speaker_sample_paths)
            labels += [posicion] * len(speaker_sample_paths)
            posicion = posicion + 1 
#        else: 
#            print("No debo procesar al hablante {}".format(row[2],))
            class_names_process += [name] 

        if index == count - 1:
            break

#for label, name in enumerate(class_names):
#    print("Procesando al hablante {}".format(name,))
#    dir_path = Path(DATASET_AUDIO_PATH) / name
#    speaker_sample_paths = [
#        os.path.join(dir_path, filepath)
#        for filepath in os.listdir(dir_path)
#        if filepath.endswith(".wav")
#    ]
#    audio_paths += speaker_sample_paths
#    labels += [label] * len(speaker_sample_paths)

print("Nombres de nuestras Clases (class_names_process): {}".format(class_names_process,))

print(
    #"Encontrados {} archivos, pertenecientes a {} clases. \nlabels={}".format(len(audio_paths), len(class_names), labels)
    "Encontrados {} archivos, pertenecientes a {} clases. \nlabels={} \naudio_paths={}".format(len(audio_paths), len(class_names_process), labels, audio_paths)
)


In [None]:
# Obtener TRAIN_DS y VALID_DS

# Sobreescribo: Porcentaje de muestras que voy a usar para validación
# originalmente, usaba VALID_SPLIT = 0.1
#VALID_SPLIT = 0.5

# Mezclado
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(labels)

# Dividir entre entrenamiento y validación
num_val_samples = int(VALID_SPLIT * len(audio_paths))
print("Usando {} archivos para entrenamiento.".format(len(audio_paths) - num_val_samples))
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]

print("Usando {} archivos para validación.".format(num_val_samples))
valid_audio_paths = audio_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]

# Crear 2 datasets, uno para entrenamiento y otro para validación
train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)

#20221113 - prueba mismo shape... para poder usar más métricas.
#valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32)
valid_ds = valid_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

# Agrego el ruido al conjunto de entrenamiento
train_ds = train_ds.map(
    lambda x, y: (add_noise(x, noises, scale=SCALE), y),
    num_parallel_calls=tf.data.AUTOTUNE,
)

# Transformo las señales de audio a la frecuencia de dominio, usando 'audio_to_fft'
train_ds = train_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.AUTOTUNE
)
train_ds = train_ds.prefetch(tf.data.AUTOTUNE)

valid_ds = valid_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.AUTOTUNE
)
valid_ds = valid_ds.prefetch(tf.data.AUTOTUNE)


In [None]:
# Construir el modelo: layers, activación, optimizador, loss, métricas, callback con early_stopping y checkpoint

def residual_block(x, filters, conv_num=3, activation="relu"):
    # Atajo
    s = keras.layers.Conv1D(filters, 1, padding="same")(x)
    for i in range(conv_num - 1):
        x = keras.layers.Conv1D(filters, 3, padding="same")(x)
        x = keras.layers.Activation(activation)(x)
    x = keras.layers.Conv1D(filters, 3, padding="same")(x)
    x = keras.layers.Add()([x, s])
    x = keras.layers.Activation(activation)(x)
    return keras.layers.MaxPool1D(pool_size=2, strides=2)(x)


def build_model(input_shape, num_classes):
    inputs = keras.layers.Input(shape=input_shape, name="input")

    x = residual_block(inputs, 16, 2)
    x = residual_block(x, 32, 2)
    x = residual_block(x, 64, 3)
    x = residual_block(x, 128, 3)
    x = residual_block(x, 128, 3)

    x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dense(128, activation="relu")(x)

    outputs = keras.layers.Dense(num_classes, activation="softmax", name="output")(x)

    return keras.models.Model(inputs=inputs, outputs=outputs)


#model = build_model((SAMPLING_RATE // 2, 1), len(class_names))
model = build_model((SAMPLING_RATE // 2, 1), len(class_names_process))

model.summary()

#import keras_metrics as km
#from keras import metrics

model.compile(
    optimizer="Adam", 
    loss="sparse_categorical_crossentropy", 
    metrics=["accuracy"]
)

# Agregar devoluciones de llamada:
# 'EarlyStopping' para dejar de entrenar cuando el modelo ya no mejora.
# 'ModelCheckPoint' mantener siempre el modelo que tiene el mejor val_accuracy
model_save_filename = "model.h5"

earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(model_save_filename, monitor="val_accuracy", save_best_only=True)


In [None]:
# Entrenar modelo

#sobreescribo EPOCHS = 1
#EPOCHS = 3
#EPOCHS = 50

history = model.fit(
    train_ds,
    epochs=EPOCHS,
    validation_data=valid_ds,
    callbacks=[earlystopping_cb, mdlcheckpoint_cb],
)

In [None]:
print(model.evaluate(valid_ds))

In [None]:
#import matplotlib.pyplot as plt

def plot_history(history):
    acc = history.history["accuracy"]
    loss = history.history["loss"]
    val_loss = history.history["val_loss"]
    val_accuracy = history.history["val_accuracy"]
    
    x = range(1, len(acc) + 1)
    
    plt.figure(figsize=(12,5))
    plt.subplot(1, 2, 1)
    plt.plot(x, acc, "b", label="traning_acc")
    plt.plot(x, val_accuracy, "r", label="traning_acc")
    plt.title("Accuracy")
    
    plt.subplot(1, 2, 2)
    plt.plot(x, loss, "b", label="traning_acc")
    plt.plot(x, val_loss, "r", label="traning_acc")
    plt.title("Loss")
  
plot_history(history)

In [None]:
# Matriz de Confusión
# Create x and y tensors

x_valid = None
y_valid = None

for x, y in iter(valid_ds):
    if x_valid is None:
        x_valid = x.numpy()
        y_valid = y.numpy()
    else:
        x_valid = np.concatenate((x_valid, x.numpy()), axis=0)
        y_valid = np.concatenate((y_valid, y.numpy()), axis=0)

# Generate predictions
y_pred = model.predict(x_valid)

# Calculate confusion matrix
confusion_mtx = tf.math.confusion_matrix(
    y_valid, np.argmax(y_pred, axis=-1)
)

# Dibujar la matriz de confusión
plt.figure(figsize=(14, 5))
sns.heatmap(
    confusion_mtx, xticklabels=class_names_process, yticklabels=class_names_process, annot=True, fmt="g"
)
plt.xlabel("Prediction")
plt.ylabel("Label")
plt.title("Validation Confusion Matrix")
plt.show()

for i, label in enumerate(class_names_process):
  if i < confusion_mtx.shape[0]:
    precision = confusion_mtx[i, i] / np.sum(confusion_mtx[:, i])
    recall = confusion_mtx[i, i] / np.sum(confusion_mtx[i, :])
    print("{0:15} Precision:{1:.2f}%; Recall:{2:.2f}%".format(label, precision * 100, recall * 100))
    

In [None]:



#redefino BATCH_SIZE, por si la cantidad de muestras es menor a 128
BATCH_SIZE = 128
SAMPLES_TO_DISPLAY = 10

print("valid_audio_paths={} \n valid_labels={}".format(valid_audio_paths, valid_labels))

test_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)


#print("test_ds={}".format(test_ds[-1]))
test_ds = test_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(BATCH_SIZE)
test_ds = test_ds.map(lambda x, y: (add_noise(x, noises, scale=SCALE), y))

for audios, labels in test_ds.take(1):
    # Obtener la señal FFT
    ffts = audio_to_fft(audios)
    # Predecir
    y_pred = model.predict(ffts)
    # Tomar muestras aleatorias
    rnd = np.random.randint(0, BATCH_SIZE, SAMPLES_TO_DISPLAY)
    
   
    audios = audios.numpy()[rnd, :, :]
    labels = labels.numpy()[rnd]
    y_pred = np.argmax(y_pred, axis=-1)[rnd]



    print("labels= {}, y_pred={}".format(labels,y_pred))

    cm=confusion_matrix(labels, y_pred)
    print(cm)
    
    for index in range(SAMPLES_TO_DISPLAY):
        # Para cada muestra, imprimir la etiqueta verdadera y predicha, así como ejecutar la voz con su ruido
        print(
            "Speaker:\33{} {}\33[0m\tPredicted:\33{} {}\33[0m".format(
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names_process[labels[index]],
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names_process[y_pred[index]],
            )
        )
        display(Audio(audios[index, :, :].squeeze(), rate=SAMPLING_RATE))

In [None]:
#------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Clasificar a qué categoría corresponde el audio VozViva.wav

BATCH_SIZE = 1
#SAMPLES_TO_DISPLAY = 1

valid_audio_paths_vivo = []
valid_audio_paths_vivo += ["{}".format(DATASET_ROOT + '/audio/VozViva/VozViva.wav')]
valid_labels += []
test_ds = paths_and_labels_to_dataset(valid_audio_paths_vivo, [6])
test_ds = test_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(BATCH_SIZE)
test_ds = test_ds.map(lambda x, y: (add_noise(x, noises, scale=SCALE), y))
for audios, labels in test_ds.take(1):
    # Obtener la señal FFT
    ffts = audio_to_fft(audios)
    # Predecir
    y_pred = model.predict(ffts)
    max_pred_value=0
    idx_max_pred_value=0
    name_max_pred_value=""
    pred_name = class_names_process[np.argmax(y_pred)]
    print(pred_name)
    print("0)  -- {} {} ".format(pred_name, y_pred) )
    for index, value in enumerate(y_pred[0]):
        if (value > 0.1):
          print("y_pred({})<={} {}".format(index,value, class_names_process[index]) )
        if(value>max_pred_value):
            max_pred_value=value
            idx_max_pred_value=index
            name_max_pred_value=pred_name
    print("1) proba.cercanía=[{}] orden_categoría={} nombre_categoría={}".format(max_pred_value, idx_max_pred_value, name_max_pred_value) )
    y_pred = np.argmax(y_pred, axis=-1)
    print("2) predicción(Voz en Vivo) = {} ... y_pred={} index={}".format(class_names_process[y_pred[0]], y_pred, index))
    print("3) registro (Voz en Vivo) = {} - {}".format(valid_audio_paths_vivo[0], y_pred,  ) )

    display(Audio( valid_audio_paths_vivo[0], rate=SAMPLING_RATE ) )    

In [None]:
# Dibujar forma de onda y espectrograma del audio: VozViva.wav

# Read the wav file (mono)
samplingFrequency, signalData = wavfile.read('/content/drive/MyDrive/proyectoAudio/audio/audio/VozViva/VozViva.wav')

# Plot the signal read from wav file
plt.rcParams["figure.figsize"] = [10.00, 8.00]
plt.rcParams["figure.autolayout"] = True
plt.subplot(211)
plt.title('Wave')
plt.plot(signalData)
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.subplot(212)
plt.title('Spectrogram')
plt.specgram(signalData,Fs=samplingFrequency)
plt.xlabel('Time')
plt.ylabel('Frequency')
plt.show()