### Experiments From "Filtered Feelings: Investigating Frequency Filters in Speech Emotion Recognition Models"
Created by: Teun van Gisteren (s1055104)

### Import libraries

In [None]:
# NOTE: This cell might return an "TypeError: 'type' object is not subscriptable" error. If you run the cell again it should work.

import os
import re
import glob
import csv
from sklearn.metrics import accuracy_score
import numpy as np
from scipy.io import wavfile
from scipy.signal import butter, filtfilt
from os import mkdir
from os import makedirs
from os.path import isdir 
import shutil
from scipy.signal import spectrogram
import matplotlib.pyplot as plt

from speechbrain.inference.interfaces import foreign_class

# 5.3 IEMOCAP Data Handling & 5.4 Preprocessing

In [None]:
iemocap_location = None # The IEMOCAP_full_release folder
results_location = None # Where you want the results .csv files to go

In [None]:
# Function for parsing an IEMOCAP session by creating a dictionary that contains the sentence id as a key and the emotion as a value
def get_IEMOCAP_session_emotions(location, session_id):

    sentence_emotions = {}
    
    # Get all the emotion evaluation files from the specified session
    emo_files_path = f"{location}\\Session{session_id}\\dialog\\EmoEvaluation\\*.txt"
    emo_files = glob.glob(emo_files_path)
    
    for emo_file in emo_files:
        with open(emo_file) as file:
            file_contents = file.readlines()
            
            # Only get the lines with the file name and emotion
            names_and_emotions = [x for x in file_contents if x.startswith("[")]
            
            # Put all the file names and emotions in a dictionary
            for line in names_and_emotions:
                line_parts = line.split("\t")
                sentence_emotions[line_parts[1]] = line_parts[2]

                
    # Remove all the entries that do not have an definitive emotion
    culled_sentence_emotions = {k: v for k, v in sentence_emotions.items() if not v == "xxx" and not v == ""}
    
    # Remove all the entries that are not one of the four emotions recognised by the SpeechBrain model
    culled_sentence_emotions = {k: v for k, v in culled_sentence_emotions.items() if v == "neu" or v == "ang" or v == "sad" or v == "hap"}
    
    return culled_sentence_emotions

# Function for getting all the individual sentences wav files from the specified session
def get_IEMOCAP_session_files(location, session_id):
    files_path = f"{location}\\Session{session_id}\\sentences\\wav"
    files = []

    # For every subfolder in the session
    for folder in os.listdir(files_path):
        # List all the files in the subfolder
        contents = os.listdir(f"{location}\\Session{session_id}\\sentences\\wav\\{folder}")
        # Add all the .wav files to the list
        files += [file for file in contents if file.endswith(".wav")]
        
    return files

# Returns a list of all the files that are also in the emotions file
def filter_appropriate_files(emotions, files, filtered=False):
    if filtered:
        return [item for item in files if item.rsplit("_", 2)[0] in emotions]
    else:
        return [item for item in files if item[:-4] in emotions]

## CSV Export Function

In [None]:
# Function that takes a output path, file name, and results array and outputs the results array in CSV format to the output path in file name
def results_to_csv(output_path, name, results):
    # Get the absolute path to the new CSV file

    # Create output path if it does not exist
    if not os.path.exists(output_path):
       os.makedirs(output_path)

    csv_path = os.path.abspath(f"{output_path}\\{name}.csv")

    with open(csv_path, mode='w', newline='', encoding='utf-8') as new_file:
        fieldnames = ["Sentence", "Emotion", "Emotion_Guess"]
        writer = csv.DictWriter(new_file, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(results)

## Accuracy Score Function

In [None]:
# Fucntion to calculate the accuracy of the results, with optional decimal number length parameter 
def calculate_accuracy_str(results, dec_len=10):
    actual = [item["Emotion"] for item in results]
    predicted = [item["Emotion_Guess"] for item in results]
    score = accuracy_score(actual, predicted)
    return f"Accuracy: {score * 100:.{dec_len}f}%"


## Model Initilization and Function

In [None]:
# Initialize classifier
classifier = foreign_class(source="speechbrain/emotion-recognition-wav2vec2-IEMOCAP", 
                           pymodule_file="custom_interface.py", 
                           classname="CustomEncoderWav2vec2Classifier", 
                           run_opts={"device":"cuda"}) # Run on CUDA if you have a capable GPU. Remove parameter if you do not.

# Function that takes a session id, a dictionary of files to the emotions expressed in those files, the files themselves
# and a boolean for if the files are filtered, this is due to how the files are named.
def run_classifier(location, session, emotions, files, filtered=False):
    results = []
    
    # Count iterations for a nicer display
    iterations = 1

    # Process each file
    for wav_file in files:
        # Show progress text
        progress = iterations/len(files)*100
        print("\rProcessing: {}, Progress: {:.2f}%".format(wav_file, progress), end="")

        # Get folder .wav file is in
        if filtered:
            folder_name = wav_file.rsplit("_", 3)[0]
        else:
            folder_name = wav_file.rsplit("_", 1)[0]
        
        # Get path of current .wav file
        file_path = os.path.join(f"{location}\\Session{session}\\sentences\\wav", folder_name, wav_file)
        # Classify
        out_prob, score, index, text_lab = classifier.classify_file(file_path)
        
        # Append data to results list
        results.append({
                "Sentence": wav_file,
                "Emotion": emotions[wav_file[:-4]] if not filtered else emotions[wav_file.rsplit("_", 2)[0]],
                "Emotion_Guess": text_lab[0]
            })

        iterations += 1
        
    return results

# 5.5 Baseline Experiment

In [None]:
combined_results = []

# Loop through each session in the IEMOCAP database
for i in range(1,2):
    print(f"Session {i}")
    # Get the appropriate files and corresponding emotional labels
    session_emo = get_IEMOCAP_session_emotions(iemocap_location, i)
    session_files = get_IEMOCAP_session_files(iemocap_location, i)

    # Filter files to only keep the ones which are one of the 4 classifiable emotions
    session_appropriate_files = filter_appropriate_files(session_emo, session_files)

    # Run the classifier
    results = run_classifier(iemocap_location, i, session_emo, session_appropriate_files)
    
    # Combine results with other data
    combined_results = combined_results + results
    
    # Display classification stats per session
    print("\n", calculate_accuracy_str(results, 2))
    
    # Print a nice line between sessions
    term_size = os.get_terminal_size()
    print('=' * term_size.columns)

# Display overall performance stats
print("Overall")
print(calculate_accuracy_str(combined_results, 2))

# Export the results to a .csv file
results_to_csv(results_location, "IEMOCAP_Base", combined_results)

# 5.6 Audio Filter

In [None]:
# The following code is a modified version of the code from this stack overflow post:
# https://stackoverflow.com/questions/21871834/adding-effects-to-make-voice-sound-like-it-s-over-a-telephone

# Creates a Butterworth filter from the low and high boundaries with a given order
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

# Applies the Butterworth filter 
def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = filtfilt(b, a, data)
    return y

def bandpass_filter(buffer, lowcut, highcut, FRAME_RATE):
    return butter_bandpass_filter(buffer, lowcut, highcut, FRAME_RATE, order=3)

# Function that takes a list of files in the input path, filters them with the given low and high boundaries and outputs the files to the output path
def filter_audio(low, high, sample, input_path, output_path, files):
    if isdir(output_path) == False:
        makedirs(output_path)
    else:
        shutil.rmtree(output_path)
        makedirs(output_path)
    
    for file in files:
        samplerate, data = wavfile.read(input_path + "\\" + file)
        assert samplerate == sample
        filtered = np.apply_along_axis(bandpass_filter, 0, data, lowcut=low, highcut=high, FRAME_RATE=sample).astype('int16')
        wavfile.write(os.path.join(output_path, f'{file[:-4]}_l{low}_h{high}.wav'), samplerate, filtered)

## Helper functions

In [None]:
def filter_IEMOCAP_sentence_files(location, output_path, session_id, low_bound, high_bound):
    files_path = f"{location}\\Session{session_id}\\sentences\\wav"
    files = []

    # Create 
    if not os.path.exists(output_path):
       os.makedirs(output_path)
    
    for folder in os.listdir(files_path):
        files = os.listdir(f"{location}\\Session{session_id}\\sentences\\wav\\{folder}")
        wavfiles = [file for file in files if file.endswith('.wav')]
        filter_audio(low_bound, high_bound, 16000, 
                     f"{location}\\Session{session_id}\\sentences\\wav\\{folder}", 
                     os.path.normpath(f"{output_path}\\Session{session_id}\\sentences\wav\\{folder}"), wavfiles)

# 5.6.2 Baseline Filter Experiment

In [None]:
filtered_iemocap_location = None # Where you want the filtered audio to be stored

In [None]:
def classify_filtered_files(low_bound, high_bound):
    combined_results = []
    # Loop through each session in the IEMOCAP database
    for i in range(1,6):
        print(f"Session {i}")
        
        # Filter the audio files 
        print("Filtering audio files...")
        filter_IEMOCAP_sentence_files(iemocap_location, filtered_iemocap_location, i, low_bound, high_bound)
        
        # Get the appropriate files and corresponding emotional labels
        session_emo = get_IEMOCAP_session_emotions(iemocap_location, i)
        session_files = get_IEMOCAP_session_files(filtered_iemocap_location, i)
    
        # Filter files to only keep the ones which are one of the 4 classifiable emotions
        session_appropriate_files = filter_appropriate_files(session_emo, session_files, filtered=True)
    
        # Run the classifier
        results = run_classifier(filtered_iemocap_location, i, session_emo, session_appropriate_files, filtered=True)
        
        # Combine results with other data
        combined_results = combined_results + results
        
        # Display classification stats per session
        print(calculate_accuracy_str(results, 2))
        
        # Print a nice line between sessions
        term_size = os.get_terminal_size()
        print('=' * term_size.columns)
    
    # Display performance stats over whole database
    print("Overall")
    print(calculate_accuracy_str(combined_results, 2))
    
    # Export the results to a .csv file
    results_to_csv(results_location, f"IEMOCAP_Filtered_l{low_bound}_h{high_bound}", combined_results)

In [None]:
low_bound = 1 # Lower bound has to be > 0
high_bound = 7999 # Higher bound has to be < nyquist frequency (8000 in this case)

classify_filtered_files(low_bound, high_bound)

## Checking if file is filtered correctly

In [None]:
def plot_waveform(wav_file):
    # Read the WAV file
    samplerate, data = wavfile.read(wav_file)

    # Compute spectrogram
    f, t, Sxx = spectrogram(data, fs=samplerate)

    # Plot spectrogram
    plt.figure(figsize=(10,5))
    plt.pcolormesh(t, f, 10 * np.log10(Sxx))  # Plot in dB scale
    plt.xlabel('Time [sec]')
    plt.ylabel('Frequency [Hz]')
    plt.title(f'Spectrogram of {wav_file}')
    plt.colorbar(label='Power Spectral Density [dB]')
    plt.tight_layout()
    plt.show()

# Get handpicked file from the dataset and plot the spectrogram
original_file = f"{iemocap_location}\\Session1\\sentences\\wav\\Ses01F_impro01\\Ses01F_impro01_F000.wav"
plot_waveform(original_file)

# Here we filter one handpicked file from the IEMOCAP dataset, to confirm the filter is applied correctly
filter_audio(300, 3400, 16000, 
             f"{iemocap_location}\\Session1\\sentences\\wav\\Ses01F_impro01", 
             os.path.normpath(f"{filtered_iemocap_location}\\Session1\\sentences\wav\\Ses01F_impro01"), ["Ses01F_impro01_F000.wav"])

plot_waveform(f"{filtered_iemocap_location}\\Session1\\sentences\\wav\\Ses01F_impro01\\Ses01F_impro01_F000_l300_h3400.wav")

In [None]:
low_bound = 300 
high_bound = 3400 

classify_filtered_files(low_bound, high_bound)

# 5.7 Systematically Altering The Frequency Range on Audio Filters

In [None]:
# Function to systematically classify dataset with different frequency ranges
# low_bound is an array with the lower and upper bound of the lower bound
# high_bound is an array with the lower and upper bound of the higher bound
# step_size is the step size between frequencies in these ranges
def systematically_classify_filtered_files(low_bound, high_bound, step_size):
    # Define the ranges for x (low) and y (high)
    low_range = range(low_bound[0], low_bound[1], step_size)  
    high_range = range(high_bound[0], high_bound[1], step_size)
    
    # Generate the coordinates making sure that the higher bound is not lower or equal to the lower bound
    frequency_bands = [(x, y) for x in low_range for y in high_range if y > x]
    
    for step in frequency_bands:
        combined_results = []
        print(f"Frequency Range between {step[0]} and {step[1]} Hz")
        # Loop through each session in the IEMOCAP database
        for i in range(1,6):
            print(f"Session {i}")
            
            # Filter the audio files 
            print("Filtering audio files...")
            filter_IEMOCAP_sentence_files(iemocap_location, filtered_iemocap_location, i, step[0], step[1])
            
            # Get the appropriate files and corresponding emotional labels
            session_emo = get_IEMOCAP_session_emotions(iemocap_location, i)
            session_files = get_IEMOCAP_session_files(filtered_iemocap_location, i)
        
            # Filter files to only keep the ones which are one of the 4 classifiable emotions
            session_appropriate_files = filter_appropriate_files(session_emo, session_files, filtered=True)
        
            # Run the classifier
            results = run_classifier(filtered_iemocap_location, i, session_emo, session_appropriate_files, filtered=True)
            
            # Combine results with other data
            combined_results = combined_results + results
            print("\n")
        results_to_csv(results_location, f"IEMOCAP_Filtered_l{step[0]}_h{step[1]}", combined_results)

In [None]:
systematically_classify_filtered_files([1, 4002], [1, 7999], 200)