### Prepocesamiento y preparación del dataset.

En primer lugar los audios se convierten a formato mp3 para que estén igual a las grabaciones del call center. Después se aplica un filtro de 300 a 3300 Hz porque es el rango de frecuencias en el que trabajan los codecs de audio más utilizados para telefonía IP. Por último, a los audios se les calcula el los features.
Los casos positivos se incrementan agregando ruido y desplazándolos en tiempo.

#### Imports

In [1]:
import glob
import random
from pydub import AudioSegment,silence
import parselmouth
from parselmouth.praat import call
import librosa
import statistics
import numpy as np
from sklearn.model_selection import train_test_split
from scipy.signal import lfilter, butter
import os


- ### Features:
    - mfcc  longitud: 40 [0:40]

    - mfcc_delta1  longitud: 40 [40:80]

    - mfcc_delta2  longitud: 40 [80:120]

    - meanF0  longitud: 1 [120]

    - stdevF0  longitud: 1 [121]

    - hnr  longitud: 1 [122]

    - f_means  longitud: 4 (f1, f2, f3, f4) Formantes [123:127] No esta habilitada porque no andaba para todos los archivos

    - f_medians  longitud: 4 (f1, f2, f3, f4) Formantes   [127:131] No esta habilitada porque no andaba para todos los archivos

    - spectral_centroid  longitud: 121 [122:243]

    - spectral_rollof  longitud: 121 [243:364]

    - zero_crossing_rate  longitud: 121 [364:485]

    - Longitud total del vector de features: 494
        

#### Funciones de procesamiento y data augmentation

In [2]:
# Filtro para igual el ancho de banda del codec de comunicación IP

def butter_params(fs, order=5):
    nyq = 0.5 * fs
    low_freq = 300
    high_freq = 3300
    low = low_freq / nyq
    high = high_freq / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def butter_bandpass_filter(audio, fs, order=5):
    b, a = butter_params(fs, order=order)
    y = lfilter(b, a, audio)
    return y

# Aumentado del dataset (Agregado de ruido y desplazamiento temporal)

def add_noise(data):
    max_amp = 0.15
    rand_amp = random.randrange(80,100) / (100 / max_amp)
    noise_amp = rand_amp*np.random.uniform()*np.amax(data)
    data = data + noise_amp*np.random.normal(size=data.shape[0])
    return data

def shift_time(audio, sr, max_shift):
    shift = 0
    while shift == 0:
        max_shift = max_shift * sr
        direction = random.randrange(-1, 2, 2)  # -1 = right, 1 = left
        shift = np.random.randint(max_shift) * direction
        if shift == 0:
            continue
        audio = np.roll(audio, shift)
        if direction == -1:
            audio[shift:] = 0
        else:
            audio[:shift] = 0

        return audio

def augment_data(audio, sr):
    noise_audio = add_noise(audio)
    shifted_audio = shift_time(audio, sr, len(audio)*0.5/sr)
    return noise_audio, shifted_audio

# Funcion para cargar dataset una vez guardado

def load_dataset(dataset_name, mix=True):
    dataset = np.load(dataset_name, allow_pickle=True)
    X = dataset[()]['x']
    Y = dataset[()]['y']
    if mix:
        x_train, x_test, y_train, y_test = train_test_split(np.array(X), np.array(Y), test_size=0.1, random_state=9)
        return x_train, x_test, y_train, y_test
    else:
        return X, Y

#### Funciones de extracción de features

In [3]:
def get_crest_factor_RMS(sound):
    rms = np.mean(librosa.feature.rms(sound))
    peak = max(np.abs(sound))
    crest_factor = peak / rms
    return crest_factor, rms


def measureFormants(sound, f0min, f0max):
    pointProcess = call(sound, "To PointProcess (periodic, cc)", f0min, f0max)

    formants = call(sound, "To Formant (burg)", 0.0025, 5, 5000, 0.025, 50)
    numPoints = call(pointProcess, "Get number of points")

    f1_list = []
    f2_list = []
    f3_list = []
    f4_list = []

    # Measure formants only at glottal pulses
    for point in range(0, numPoints):
        point += 1
        t = call(pointProcess, "Get time from index", point)

        f1 = call(formants, "Get value at time", 1, t, "Hertz", "Linear")
        f2 = call(formants, "Get value at time", 2, t, "Hertz", "Linear")
        f3 = call(formants, "Get value at time", 3, t, "Hertz", "Linear")
        f4 = call(formants, "Get value at time", 4, t, "Hertz", "Linear")

        f1_list.append(f1)
        f2_list.append(f2)
        f3_list.append(f3)
        f4_list.append(f4)

    f1_list = [f1 for f1 in f1_list if str(f1) != "nan"]
    f2_list = [f2 for f2 in f2_list if str(f2) != "nan"]
    f3_list = [f3 for f3 in f3_list if str(f3) != "nan"]
    f4_list = [f4 for f4 in f4_list if str(f4) != "nan"]

    # calculate mean formants across pulses
    f1_mean = statistics.mean(f1_list)
    f2_mean = statistics.mean(f2_list)
    f3_mean = statistics.mean(f3_list)
    f4_mean = statistics.mean(f4_list)
    f_means = [f1_mean, f2_mean, f3_mean, f4_mean]
    # calculate median formants across pulses, this is what is used in all subsequent calcualtions
    # you can use mean if you want, just edit the code in the boxes below to replace median with mean
    f1_median = statistics.median(f1_list)
    f2_median = statistics.median(f2_list)
    f3_median = statistics.median(f3_list)
    f4_median = statistics.median(f4_list)
    f_medians = [f1_median, f2_median, f3_median, f4_median]
    return f_means, f_medians


def get_mfccs(y, sr, n_mfcc):

    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
    mfcc_delta1 = librosa.feature.delta(mfcc)
    mfcc_delta2 = librosa.feature.delta(mfcc_delta1)
    mfcc = np.mean(mfcc, axis=1)
    mfcc_delta1 = np.mean(mfcc_delta1, axis=1)
    mfcc_delta2 = np.mean(mfcc_delta2, axis=1)
    return mfcc, mfcc_delta1, mfcc_delta2


def feature_extraction(audio, sr, f0min, f0max, n_mfcc, unit="Hertz" ):
    
    f0, _, _ = librosa.pyin(
        audio, fmin=librosa.note_to_hz("C2"), fmax=librosa.note_to_hz("C7")
    )
    if len(f0)<9:
        return 'skip'
    f0_delta = librosa.feature.delta(f0)
    meanF0 = np.nanmean(f0)
    stdevF0 = np.nanstd(f0)
    meanF0delta = np.nanmean(f0_delta)

    if np.isnan(meanF0):
        return 'skip'

    sound = parselmouth.Sound(audio, sr)  # read the sound
    pitch = call(sound, "To Pitch", 0.0, f0min, f0max)  # create a praat pitch object

    

    harmonicity = call(sound, "To Harmonicity (cc)", 0.01, f0min, 0.1, 1.0)
    hnr = call(harmonicity, "Get mean", 0, 0)

    # f_means, f_medians = measureFormants(sound, f0min, f0max)  # Formantes

    mfcc, mfcc_delta1, mfcc_delta2 = get_mfccs(audio, sr, n_mfcc)
    crest_factor, rms = get_crest_factor_RMS(audio)
    spectral_centroid = np.mean(librosa.feature.spectral_centroid(audio, sr=sr))
    spectral_rollof = np.mean(
        librosa.feature.spectral_rolloff(audio, sr=sr, roll_percent=0.85)
    )

    zero_crossing_rate = np.mean(librosa.feature.zero_crossing_rate(audio, sr))
    output = np.concatenate([mfcc,
        mfcc_delta1,
        mfcc_delta2,
        np.array([meanF0]),
        np.array([stdevF0]),
        np.array([meanF0delta]),
        np.array([hnr]),
        np.array([crest_factor]),
        np.array([rms]),
        # np.array(f_means),
        # np.array(f_medians),
        np.array([spectral_centroid]),
        np.array([spectral_rollof]),
        np.array([zero_crossing_rate])])
    output = list(output)
    return output




#### Armado del dataset completo

- Se eliminan los audios de menos de 1 segundo

In [18]:
def dataset_normalization(dataset, n_mfcc):
    dataset = np.array(dataset)
    for n in range(3):
        mean = np.mean(dataset[:, n * n_mfcc : (n + 1) * n_mfcc])
        std = np.std(dataset[:, n * n_mfcc : (n + 1) * n_mfcc])
        dataset[:, n * n_mfcc : (n + 1) * n_mfcc] = (
            dataset[:, n * n_mfcc : (n + 1) * n_mfcc] - mean
        ) / std
    N = n_mfcc * 3
    mean = np.mean(dataset[:, N:], axis=0)
    std = np.std(dataset[:, N:], axis=0)
    dataset[:, N:] = (dataset[:, N:] - mean) / std
    return dataset

def process_dataset(directory, name,n_mfcc,n_start, augment=True ):
    f0min = 300
    f0max = 3300
    directory = directory + '*/*.mp3'
    files = glob.glob(directory)
    anger = 0
    X = []
    Y = []
    ext = '.npy'
    files = files[n_start :]
    try:
        for n, path in enumerate(files):
            if n % 1000 == 0:
                print(n)
            if n % 100 == 0 and n != 0:
                dataset = {'x': X, 'y': Y }
                np.save('datasets/' + name + f'_{n}' + ext, dataset)
                X = []
                Y = []

            audio, sr = librosa.load_mp3(path)

            audio = butter_bandpass_filter(audio, sr, order=5)
            features = feature_extraction(audio, sr, f0min, f0max,n_mfcc, unit="Hertz" )
            if features == 'skip':
                continue 
            
            file_name = os.path.basename(path)
            file_name = file_name.replace("-", "_")
        
            emotion = file_name.split('_')[2]
            if len(list(audio))/sr < 0.5:
                continue
            if emotion == '05' or emotion == 'anger.mp3':
                emotion = 1
                anger = anger + 1
            else:
                emotion = 0
            X.append(features)
            Y.append(emotion)
            
            if emotion == 1 and augment:
                augmented_data = augment_data(audio, sr)
                for audio in augmented_data:
                    audio = butter_bandpass_filter(audio, sr, order=5)
                    features = feature_extraction(audio, sr, f0min, f0max,n_mfcc, unit="Hertz" )
                    anger = anger + 1
                    X.append(features)
                    Y.append(emotion)
    except Exception as e:
        print(path)
        raise(e)


In [1]:
directory = glob.glob("/home/francoj/Documentos/Reconocimiento de emociones/tesis/Data/*/")
n = 1
for dataset in directory:
    if n == 1:
        continue
        n = 0
    name = dataset.split("/")[-2]
    process_dataset(dataset, name, 16, 400)
    print('terminado ', name)



NameError: name 'glob' is not defined

In [None]:
dataset = {'x': x, 'y': y }
np.save('prueba.npy', dataset)