# **Classification | Emotion Recognition in Speech**

## **Load packages**

In [5]:
import pandas as pd
import os
import shutil
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns


## **Load datasets**

## **Useful functions**

In [3]:
def rename_files(directories: list[str]):

    """

    Description:
        This function renames files, adding the "author_##" and keeping the id of the video

    Args:
        directories (list[str]): array of str, where each str is the local path to a directory, these directories contain videos .mp4
    
    """

    print("Starting renaming process...\n")

    for directory in directories:

        print(f"Renaming process for {directory} started.")
        
        for filename in os.listdir(directory): # archives inside of the curret directory
            if filename.endswith(".wav"): # if the file is a video with an _

                id = filename.split("_")[-1] # extract just the id
                new_filename = f"{directory.split("/")[-1]}_{id}" # concat just the id w '.mp4'

                old_file = os.path.join(directory, filename) # path to the old file
                new_file = os.path.join(directory, new_filename) # path to the new file

                os.rename(old_file, new_file) # rename videos w just its id
        
        print(f"Renaming process for {directory} completed.")

In [10]:

def write_audio_paths_txt(directories: list[str], path: list[str]):

    """

    Description:
        This function renames files, just keeping the id of the video

    Args:
        directories (list[str]): array of str, where each str is the local path to a directory, these directories contain videos .mp4
    
    """

    print("Starting renaming process...\n")

    paths = []

    for directory in range (len(directories)):

        print(f"Writing process for {directories[directory]} started.")
        
        for filename in os.listdir(directories[directory]): # archives inside of the curret directory
            if filename.endswith(".wav"): # if the file is a video with an _

                name = filename # extract just the id
                paths.append(os.path.join(path[directory], f"{name}")) # append the id to the array

    print(paths)

    output_file = f'./txt/paths.txt'
    with open(output_file, 'w') as f:
        for path in paths:
            f.write(f"{path}\n")
        

You can change the location according to your paths

In [11]:
directories = [f'./archive/Actor_{i:02d}' for i in range(1, 25)]
paths = [f'D:/UTEC/IA/PROYECTO3/emotion-recognition-in-speech/archive/Actor_{i:02d}/' for i in range(1, 25)]

In [12]:
os.makedirs("txt", exist_ok=True)

rename_files(directories)

Starting renaming process...

Renaming process for ./archive/Actor_01 started.
Renaming process for ./archive/Actor_01 completed.
Renaming process for ./archive/Actor_02 started.
Renaming process for ./archive/Actor_02 completed.
Renaming process for ./archive/Actor_03 started.
Renaming process for ./archive/Actor_03 completed.
Renaming process for ./archive/Actor_04 started.
Renaming process for ./archive/Actor_04 completed.
Renaming process for ./archive/Actor_05 started.
Renaming process for ./archive/Actor_05 completed.
Renaming process for ./archive/Actor_06 started.
Renaming process for ./archive/Actor_06 completed.
Renaming process for ./archive/Actor_07 started.
Renaming process for ./archive/Actor_07 completed.
Renaming process for ./archive/Actor_08 started.
Renaming process for ./archive/Actor_08 completed.
Renaming process for ./archive/Actor_09 started.
Renaming process for ./archive/Actor_09 completed.
Renaming process for ./archive/Actor_10 started.
Renaming process for 

In [13]:
write_audio_paths_txt(directories, paths)

Starting renaming process...

Writing process for ./archive/Actor_01 started.
Writing process for ./archive/Actor_02 started.
Writing process for ./archive/Actor_03 started.
Writing process for ./archive/Actor_04 started.
Writing process for ./archive/Actor_05 started.
Writing process for ./archive/Actor_06 started.
Writing process for ./archive/Actor_07 started.
Writing process for ./archive/Actor_08 started.
Writing process for ./archive/Actor_09 started.
Writing process for ./archive/Actor_10 started.
Writing process for ./archive/Actor_11 started.
Writing process for ./archive/Actor_12 started.
Writing process for ./archive/Actor_13 started.
Writing process for ./archive/Actor_14 started.
Writing process for ./archive/Actor_15 started.
Writing process for ./archive/Actor_16 started.
Writing process for ./archive/Actor_17 started.
Writing process for ./archive/Actor_18 started.
Writing process for ./archive/Actor_19 started.
Writing process for ./archive/Actor_20 started.
Writing pr

Ahora que tenemos todos los path de los videos almacenados en un `.txt` (separado por train, val y test) podemos usar video_features para realizar la extracción de caracteristicas correspondiente.

Para esto primero clonamos el repositorio de video_features e instalamos las dependencias necesarias dentro de este directorio/repositorio:

```bash
git clone https://github.com/v-iashin/video_features.git
cd video_features

Para realizar la instalación de dependencias, necesitas tener anaconda/miniconda instalado

```bash
conda create -n video_features
conda install pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia
conda install -c conda-forge omegaconf scipy tqdm pytest opencv
conda install -c conda-forge av

Con esto hecho, ya se puede realizar la extracción de caracteristicas en la terminal con el siguiente comando:

```bash

python main.py 
        \ feature_type=vggish
        \ device="cuda:0" 
        \ file_with_video_paths="../txt/paths.txt" 
        \ on_extraction=save_numpy 
        \ output_path="../extraction"

# 'name' could be: [train, test, val]

Ejecutando lo anterior, se crean archivos `.npy` dentro del directorio 'videos'. Cada archivo le corresponde a la extracción de características de un video.

# Preparacion de los datos

## Identificación de shapes en los archivos npy
Se realiza la identificación de los shapes que se forman luego de obtener las caracteristicas de los videos, para encontrar la diferencia de los mismos y se pueda trabajar con una misma dimensionalidad en todos los videos

In [46]:
import os
import numpy as np
from collections import Counter

# Ruta de la carpeta donde están los archivos npy
data_folder = "./extraction/vggish/"

# Obtener la lista de archivos .npy en la carpeta
npy_files = [f for f in os.listdir(data_folder) if f.endswith(".npy")]

# Diccionario para almacenar las formas
shape_counter = Counter()

# Cargar los archivos y contar las formas
for file in npy_files:
    file_path = os.path.join(data_folder, file)
    data = np.load(file_path)
    shape_counter[data.shape] += 1

# Mostrar el conteo de formas únicas
print("Conteo de formas de los archivos .npy:")
for shape, count in shape_counter.items():
    print(f"Forma: {shape} -> Cantidad: {count}")


Conteo de formas de los archivos .npy:
Forma: (3, 128) -> Cantidad: 1054
Forma: (4, 128) -> Cantidad: 379
Forma: (5, 128) -> Cantidad: 7


## Agregar el label
Se agrega el label correspondiente a la emoción para hacer el entrenamiento de los datos y su clasificación, no se modifica los archivos originales de extraction para mantenerlos sin modificaciones y poder utilizarlos para futuros

In [47]:
import os
import numpy as np
import pandas as pd

# Ruta donde están los archivos .npy
data_folder = "./extraction/vggish/"
output_folder = "./label_added_npy/"

# Crear carpeta si no existe
os.makedirs(output_folder, exist_ok=True)

# Diccionario de emociones según el código en el nombre del archivo
emotion_dict = {
    "01": "neutral",
    "02": "calm",
    "03": "happy",
    "04": "sad",
    "05": "angry",
    "06": "fearful",
    "07": "disgust",
    "08": "surprised"
}

# Obtener la lista de archivos .npy
npy_files = [f for f in os.listdir(data_folder) if f.endswith(".npy")]

# Procesar archivos
for file in npy_files:
    parts = file.split('_')  # Separar por guiones bajos "_"
    
    if len(parts) >= 3:  # Asegurar que tiene suficiente estructura
        emotion_code = parts[2].split('-')[2]  # Obtener el cuarto grupo (emoción)
        emotion = emotion_dict.get(emotion_code, "unknown")  # Buscar en el diccionario
        
        # Cargar datos del archivo npy
        data = np.load(os.path.join(data_folder, file))

        # Guardar nuevo archivo con la etiqueta incluida como diccionario
        output_path = os.path.join(output_folder, file)
        np.save(output_path, {"data": data, "label": emotion})

print(f"Archivos transformados guardados en: {output_folder}")


Archivos transformados guardados en: ./label_added_npy/


## Separación por shapes
Se crea una carpeta diferente para cada shape y se guarda cada video en su respectivo folder con su shape indicado para realizar un trabajo más eficiente con los datos

In [48]:
import os
import shutil
import numpy as np
from collections import defaultdict

# Ruta de la carpeta donde están los archivos npy con label incluido
data_folder = "./label_added_npy"
output_folder = "./datasets_by_shape/"

# Crear la carpeta de salida si no existe
os.makedirs(output_folder, exist_ok=True)

# Obtener la lista de archivos .npy en la carpeta
npy_files = [f for f in os.listdir(data_folder) if f.endswith(".npy")]

# Diccionario para agrupar archivos por forma
shape_dict = defaultdict(list)

# Cargar los archivos y organizarlos por forma
for file in npy_files:
    file_path = os.path.join(data_folder, file)
    
    # Cargar el archivo con allow_pickle=True porque ahora es un diccionario
    loaded_data = np.load(file_path, allow_pickle=True).item()
    
    # Acceder a la parte de los datos
    data_array = loaded_data["data"]
    
    # Agrupar por forma
    shape_dict[data_array.shape].append(file_path)

# Copiar los archivos en su respectiva carpeta
for shape, files in shape_dict.items():
    shape_folder = os.path.join(output_folder, f"shape_{'_'.join(map(str, shape))}")
    os.makedirs(shape_folder, exist_ok=True)

    for file_path in files:
        filename = os.path.basename(file_path)
        destination_path = os.path.join(shape_folder, filename)
        shutil.copy(file_path, destination_path)

    print(f"Copiados {len(files)} archivos en: {shape_folder}")


Copiados 1054 archivos en: ./datasets_by_shape/shape_3_128
Copiados 379 archivos en: ./datasets_by_shape/shape_4_128
Copiados 7 archivos en: ./datasets_by_shape/shape_5_128


## Separacion en training y testing para cada uno de los Shapes
Al momento de crear cada uno de los grupos e shapes, separamos cada grupo de datos en los training y testing para poder entrenar el modelo con informacion clara diferenciandose por las emociones y asegurandonos tener data de cada emocion

In [49]:
import os
import shutil
import numpy as np
from sklearn.model_selection import train_test_split

# Ruta de la carpeta donde están los archivos organizados por forma
base_folder = "./datasets_by_shape/"

# Recorrer todas las carpetas de forma
for shape_folder in os.listdir(base_folder):
    shape_path = os.path.join(base_folder, shape_folder)
    
    if not os.path.isdir(shape_path):  # Saltar archivos que no sean carpetas
        continue

    # Crear nueva carpeta de salida con el prefijo "training_"
    new_folder = os.path.join(base_folder, f"training_{shape_folder}")
    os.makedirs(new_folder, exist_ok=True)

    # Crear subcarpetas train y test dentro de la nueva carpeta
    train_folder = os.path.join(new_folder, "train")
    val_folder = os.path.join(new_folder, "test")
    os.makedirs(train_folder, exist_ok=True)
    os.makedirs(val_folder, exist_ok=True)

    # Obtener todos los archivos npy en la carpeta actual
    npy_files = [f for f in os.listdir(shape_path) if f.endswith(".npy")]

    # Diccionario para agrupar archivos por etiqueta
    label_dict = {}

    # Leer las etiquetas de cada archivo
    for file in npy_files:
        file_path = os.path.join(shape_path, file)
        loaded_data = np.load(file_path, allow_pickle=True).item()
        label = loaded_data["label"]  # Obtener la etiqueta del diccionario

        if label not in label_dict:
            label_dict[label] = []
        
        label_dict[label].append(file_path)

    # Dividir cada grupo en 70% train y 30% test (si hay más de 1 archivo)
    for label, files in label_dict.items():
        if len(files) == 1:
            # Si solo hay un archivo, copiarlo a train directamente
            shutil.copy(files[0], os.path.join(train_folder, os.path.basename(files[0])))
        else:
            # Si hay más de 1 archivo, dividirlo normalmente
            train_files, val_files = train_test_split(files, test_size=0.3, random_state=42)

            for file_path in train_files:
                shutil.copy(file_path, os.path.join(train_folder, os.path.basename(file_path)))

            for file_path in val_files:
                shutil.copy(file_path, os.path.join(val_folder, os.path.basename(file_path)))

    print(f"División completada para {shape_folder}")


División completada para shape_3_128
División completada para shape_4_128
División completada para shape_5_128
