# NLP Challenge
The dataset is a collection of audio clips with expressions of the following emotions: *angry, fear, happy, neutral, sad*.  
  
Your task is to train a model to perform *speech emotion recognition*.  
  
You will be provided a training set for your use. You are allowed to include additional data to train your model.  
  
The evaluation data will consist of audio clips spoken in multiple languages.  
*Majority* of the evaluation data are in the intonation of *Singapore English*.  
  
Take inspo from [training notebook](https://github.com/AbishekSankar/Audio-Classification-Deep-Learning/blob/main/Demo%20Jupyter%20Notebook/Final_Project.ipynb)  
and [Speech Emotion Recognition with CNN](https://www.kaggle.com/code/ritzing/speech-emotion-recognition-with-cnn/notebook)

## Possible Extra Datasets
https://www.kaggle.com/datasets/dmitrybabko/speech-emotion-recognition-en (specifically Crema)  
https://www.kaggle.com/datasets/piyushagni5/berlin-database-of-emotional-speech-emodb  

In [1]:
import librosa as lb
from librosa.display import specshow
import glob
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt

from tensorflow.keras.utils import to_categorical
import tensorflow as tf
import tensorflow_io as tfio
import tensorflow_addons as tfa
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model

import sklearn as sk

%matplotlib inline

## Audio Data Processing

In [2]:
max_ms = 4000

ind_to_label = {
    0 : 'angry',
    1 : 'fear',
    2 : 'happy',
    3 : 'neutral',
    4 : 'sad'
}

label_to_ind = { 
    lab: ind for ind, lab in ind_to_label.items()
    }

### Preprocessing Utils

In [None]:
class aud_util:
    
    @staticmethod
    def loadaud(audio_file_path, sr=None, mono=False):                                 # load audio file, *mono argument (bool) can auto convert to mono, while default sr is converted to 22050*
        return lb.load(audio_file_path, sr=sr, mono=mono)                              # returns (data, sr)       
    
    # @staticmethod
    # def mono_channel_withsr(audio_data_with_sr):
    #     return lb.to_mono(audio_data_with_sr[0]), audio_data_with_sr[1]

    # @staticmethod
    # def resample_withsr(data, in_sr, new_sr=22050):
    #     if in_sr == new_sr:
    #         return data
    #     else:
    #         return lb.resample(data, orig_sr=sr, new_sr=new_sr)
    
    @staticmethod
    def pad_trunc(aud, sr, target_ms):                                                 # padding places shorter audio randomly within the time frame of the padded length
        maxlen = (target_ms//1000)*sr
        
        if len(aud) == maxlen:
            return aud, sr

        elif len(aud) > maxlen:
            return aud[:maxlen], sr

        elif len(aud) < maxlen:
            
            #     random padding positions
            pad = maxlen - len(aud)
            pad = np.zeros((pad))

            # pad_begin_len = rng.randint(0, maxlen - len(aud))
            # pad_end_len = maxlen - len(aud) - pad_begin_len

            #     actaual padding
            # pad_begin = np.zeros((pad_begin_len))
            # pad_end = np.zeros((pad_end_len))

            return np.concatenate((aud, pad), 0), sr



class aud_img:
    @staticmethod
    def melspec(data, sr):
        spec = lb.feature.melspectrogram(data, sr=sr, power=1)                         # power = 1/2 changes amplitude_to_db or power_to_db
        spec = lb.amplitude_to_db(spec, ref=np.min)
        return spec

    @staticmethod
    def mfcc(data, sr):
        mfcc_ = lb.feature.mfcc(data, sr)
        #mfcc_ = sk.preprocessing.scale(mfcc_, axis=1)
        return mfcc_

    # @staticmethod
    # def display_audio_img(spec, sr , mfcc=False):
    #     fig, ax = plt.subplots()
        
    #     if mfcc:
    #         specshow(spec, sr=sr, x_axis='time')
    #     else:
    #         img = specshow(spec, x_axis='time', y_axis='mel', sr=sr, fmax=8000, ax=ax)
    #         fig.colorbar(img, ax=ax, format='%+2.0f dB')



class ds_create:
    
    # @staticmethod    
    # def label_from_bpath(bpath):                                                       # probably will not be used
    #     return bpath.decode('utf-8').split('\\')[-2]

    # @staticmethod
    # #depreciated
    # def one_label_dataset(path, label):                                                # path taken in must be raw
    #     return tf.data.Dataset.zip((
    #         tf.data.Dataset.list_files(path),
    #         tf.data.Dataset.from_tensor_slices(tf.constant(value=label_to_ind[label], dtype=tf.dtypes.int32 ,shape=len(tf.data.Dataset.list_files(path))))
    #     ))

    @staticmethod
    def slices_for_onelabel(path='Data/NLP Training Dataset/', label):                                              #for zhihao's local pc
        paths = glob.glob(path + label + '/*.wav')
        labels = [label_to_ind[label]]*len(paths)
        return paths, labels
    
    @staticmethod
    def preprocess_mel_eachlabel(file_path, label):                          
        
        data, sr = aud_util.loadaud(file_path, sr=16000, mono=True)
        data, sr = aud_util.pad_trunc(data, sr, max_ms)                                
        mel = aud_img.melspec(data, sr)
        mel = tf.expand_dims(mel, axis=2)
        
        return mel, label

    # @staticmethod
    # def path_to_mel(path):                                                              # temporary work around
        
    #     data, sr = aud_util.loadaud(path, sr=16000, mono=True)
    #     data, sr = aud_util.pad_trunc(data, sr, max_ms)                                
    #     mel = aud_img.melspec(data, sr)
    #     mel = tf.expand_dims(mel, axis=2)

    #     return mel

    @staticmethod
    def dfpremel(path):
        data, sr = aud_util.loadaud(path, sr=16000, mono=True)
        data, sr = aud_util.pad_trunc(data, sr, max_ms)                                
        mel = aud_img.melspec(data, sr)
        mel = np.expand_dims(mel, axis=2)

        return mel
    
    @staticmethod
    def dfpremfcc(path):
        data, sr = aud_util.loadaud(path, sr=16000, mono=True)
        data, sr = aud_util.pad_trunc(data, sr, max_ms)                                
        mfcc = aud_img.mfcc(data, sr)
        mfcc = np.expand_dims(mfcc, axis=2)

        return mfcc

    @staticmethod
    def dup_channel(img):
        return np.stack((img,)*3, axis=2).squeeze()

In [None]:
angry, _0= ds_create.slices_for_onelabel('Data/NLP Training Dataset/', 'angry')
fear, _1 = ds_create.slices_for_onelabel('Data/NLP Training Dataset/', 'fear')
happy, _2 = ds_create.slices_for_onelabel('Data/NLP Training Dataset/', 'happy')
neutral, _3 = ds_create.slices_for_onelabel('Data/NLP Training Dataset/', 'neutral')
sad, _4 = ds_create.slices_for_onelabel('Data/NLP Training Dataset/', 'sad')

slices = angry + fear + happy + neutral + sad
labels = _0 + _1 + _2 + _3 + _4

In [None]:
df = pd.DataFrame()

df['relative_audio_paths'] = slices
df['int_labels'] = labels
df['1hot_labels'] = list(to_categorical(labels))

df['imgs_1c'] = list(map(ds_create.dfpremel, slices))
df['imgs_3c'] = df['imgs_1c'].map(ds_create.dup_channel)


df = sk.utils.shuffle(df)
df.reset_index(inplace=True, drop=True)

In [None]:
df.head()

In [None]:
input_shape = df.iloc[0,4].shape
print(input_shape)

## Build Model

In [None]:
xIn = Input(input_shape)
net = tf.keras.applications.efficientnet_v2.EfficientNetV2S(weights='imagenet', include_top=False)
x = net(xIn)
x = Flatten()(x)
x = Dense(128, activation='swish')(x)
x = Dropout(0.5)(x)
xOut = Dense(5, activation='softmax')(x)

model = Model(xIn, xOut)
model.compile(optimizer=tf.keras.optimizers.Adam(), loss='categorical_crossentropy', metrics=['acc', tfa.metrics.F1Score(num_classes=5, average='weighted', threshold=0.5)])
model.summary()

In [None]:
callbacks = [
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', patience=3, factor=0.1, verbose=1),
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint('Model_weights', monitor='val_loss', verbose=1, save_best_only=True)
]

batch_size = 256
epochs = 100

history = model.fit(
    x=tf.stack(df['imgs_3c']),
    y=tf.stack(df['1hot_labels']),
    batch_size=batch_size,
    epochs=epochs,
    callbacks=callbacks,
    validation_split=0.2
)

## Predict on Evaluation or Test Data

In [None]:
test = ds_create.dfpremel('NLP/NLP Training Dataset/fear/00530e07e3.wav')
test = ds_create.dup_channel(test)
test = np.expand_dims(test, axis=0) # EXPAND DIMS OF FIRST DIMENSION ARGHHHHHH
pred = model.predict(test)
pred = np.argmax(pred)
pred

In [None]:
class test_gen:

    @staticmethod
    def path_to_mel(path):
        c = ds_create.dfpremel(path)
        ccc = ds_create.dup_channel(c)
        return ccc
    
    @staticmethod
    def path_to_mfcc(path):
        c = ds_create.dfpremfcc(path)
        ccc = ds_create.dup_channel(c)
        return ccc

    @staticmethod
    def int_to_label(int):
        return ind_to_label[int]

In [None]:
q_df = pd.DataFrame()
paths = sorted(glob.glob('Data/NLP Interim Dataset/*.wav'))
q_data = list(map(test_gen.path_to_mel, paths))

q_data = tf.stack(q_data)

preds = model.predict(q_data)
preds = np.argmax(preds, axis=1)

In [None]:
# x_test = []
# filepath = 'Data/NLP Interim Dataset/NLP Interim Dataset/NLP/*'
# for file in glob.glob(filepath):
#     feature = extract_feature(file, mfcc=False, chroma=False, mel=True)
#     x_test.append(feature)

In [None]:
# # Extract features (mfcc, chroma, mel) from a sound file
# def extract_feature(file_name, mfcc, chroma, mel):
#     with soundfile.SoundFile(file_name) as sound_file:
#         X = sound_file.read(dtype="float32")
#         sample_rate=sound_file.samplerate
#         if chroma:
#             stft=np.abs(librosa.stft(X))
#         result=np.array([])
#         if mfcc:
#             mfccs=np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40).T, axis=0)
#             result=np.hstack((result, mfccs))
#         if chroma:
#             chroma=np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T,axis=0)
#             result=np.hstack((result, chroma))
#         if mel:
#             mel=np.mean(librosa.feature.melspectrogram(X, sr=sample_rate).T,axis=0)
#             result=np.hstack((result, mel))
#     # print(type(result))
#     # print(result.shape)
#     return result
# x_train, y_train = [], []

# for emotion in emotions_list:
#     filepath = 'Data/NLP Training Dataset/ASR Training Dataset/' + emotion + '/*'
#     for file in glob.glob(filepath):
#         feature = extract_feature(file, mfcc=True, chroma=True, mel=True)
#         x_train.append(feature)
#         y_train.append(emotion)

# x_train = np.array(x_train)
# y_train = np.array(y_train)
# print(type(x_train))
# print(x_train.shape)
# # shuffle order of data and label to match
# def unison_shuffled_arrays(x, y):
#     assert len(x) == len(y)
#     p = np.random.permutation(len(x))
#     return x[p], y[p]

# shuffle_x_train, shuffle_y_train = unison_shuffled_arrays(x_train, y_train)
# print(shuffle_x_train.shape)

# print(len(shuffle_x_train[5]))