In [None]:
# mount google drive on colab
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
import pandas as pd
import numpy as np
import os

from numpy import array
from keras.preprocessing.text import one_hot, Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential, Model
from keras.layers import Dense, BatchNormalization
from keras.layers import Flatten, Input, Conv1D, Conv2D, MaxPooling2D, GlobalMaxPooling2D, MaxPooling1D, Dense, GlobalMaxPooling1D, Dropout
from keras.layers.embeddings import Embedding
from keras.layers import Embedding
from keras.layers import Concatenate
from keras.utils import to_categorical
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import librosa
from sklearn.utils import shuffle
import pickle

# download from:
# https://github.com/SenticNet/MELD/blob/master/baseline/baseline.py

# DATA_PATH = '/content/gdrive/Team Drives/IKE Data/Emotion detection'
DATA_PATH = 'data'
DATASET_PATH = os.path.join(DATA_PATH, 'dataset_combined.pkl')
NUM_CLASSES = 7

train = pd.read_csv(os.path.join(DATA_PATH, 'emorynlp_train_final.csv'))
val = pd.read_csv(os.path.join(DATA_PATH, 'emorynlp_dev_final.csv'))
test = pd.read_csv(os.path.join(DATA_PATH, 'emorynlp_test_final.csv'))
train.head()


In [None]:
# create tokenizer and fit on train
tokenizer = Tokenizer(num_words=2000)
tokenizer.fit_on_texts(train['Utterance'])
word_index = tokenizer.word_index
print('Found %s unique tokens.' % len(word_index))

In [None]:
embeddings_index = {}
f = open(os.path.join('data', 'glove.6B', 'glove.6B.100d.txt'))
for line in f:
    values = line.split()
    word = values[0]
    coefs = np.asarray(values[1:], dtype='float32')
    embeddings_index[word] = coefs
f.close()

print('Found %s word vectors.' % len(embeddings_index))


In [None]:
def preprocess(data, folder, tokenizer):
    data['label'] = data['Emotion'].astype('category').cat.codes

    labels = []
    spectrums = []
    texts = []

    for index, row in data.iterrows():
        filename = 'sea%d_ep%d_sc%d_utt%d.wav' % (row.Season, row.Episode, row.Scene_ID, row.Utterance_ID)
        path = os.path.join('data', 'audio', folder, filename)
        if not os.path.isfile(path):
            print('file does not exist', path)
            continue

        sample, sr = librosa.load(path, 24000, duration=2)
        assert sr == 24000, 'sample rate incorrect'

        padded_sample = np.zeros(48001, dtype='float32')
        padded_sample[:sample.shape[0]] = sample

        melgram = librosa.feature.melspectrogram(padded_sample, sr, n_fft=512, hop_length=256, n_mels=96)
        log_melgram = librosa.amplitude_to_db(melgram)
        log_melgram = np.expand_dims(log_melgram, axis=-1)
        assert log_melgram.shape == (96, 188, 1), 'mel shape incorrect'

        labels.append(row.label)
        spectrums.append(log_melgram)
        texts.append(row['Utterance'])

        if index % 500 == 0:
            print('loaded %d from %d rows' % (index, len(data)))
        
    # create np array for audio
    x1 = np.stack(spectrums)
    
    # create np array for texts
    sequences = tokenizer.texts_to_sequences(texts)
    padded = pad_sequences(sequences, maxlen=33)
    x2 = np.stack(padded)
    
    # create np array for labels
    y = np.stack(labels)
    y = to_categorical(y, num_classes=NUM_CLASSES)
    
    # shuffle
    x1, x2, y = shuffle(x1, x2, y, random_state=42)
    
    return [x1, x2], y

if not os.path.isfile(DATASET_PATH):
    print("loading from cache")
    with open(DATASET_PATH, 'rb') as handle:
        train_x, train_y, test_x, test_y, val_x, val_y, word_index = pickle.load(handle)

else:        
    train_x, train_y = preprocess(train, 'train', tokenizer)
    test_x, test_y = preprocess(test, 'test', tokenizer)
    val_x, val_y = preprocess(val, 'dev', tokenizer)
    with open(DATASET_PATH, 'wb') as handle:
        pickle.dump((train_x, train_y, test_x, test_y, val_x, val_y, word_index), handle)    

print(train_x[0].shape, train_x[1].shape, train_y.shape)        

In [None]:
# show some spectrograms
for i in range(0, 10):
    plt.imshow(np.squeeze(train_x[0][i]))
    plt.show()

In [None]:
# create embedding matrix from glove
embeddings_index = {}
f = open(os.path.join('data', 'glove.6B', 'glove.6B.100d.txt'))
for line in f:
    values = line.split()
    word = values[0]
    coefs = np.asarray(values[1:], dtype='float32')
    embeddings_index[word] = coefs
f.close()

print('Found %s word vectors.' % len(embeddings_index))

embedding_matrix = np.zeros((len(word_index) + 1, 100))
for word, i in word_index.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

In [None]:
train_x = list(train_x)
val_x = list(val_x)
test_x = list(test_x)

In [None]:
print (train_x[0].shape, train_x[1].shape, train_y.shape)

In [None]:
from imblearn.keras import BalancedBatchGenerator
# from imblearn.under_sampling import RandomUnderSampler
from imblearn.under_sampling import NearMiss

train_gen = BalancedBatchGenerator(
    train_x[0], train_y, sampler=NearMiss(), batch_size=32, random_state=42)

x, y = next(train_gen)
print (x[0].shape, x[1].shape, y.shape)

In [None]:
# audio layers
audio_input = Input(shape=train_x[0].shape[1:])

x = Conv2D(32, 5, activation='relu')(audio_input)
x = MaxPooling2D(3)(x)
x = BatchNormalization()(x)
x = Dropout(0.2)(x)
x = Conv2D(64, 5, activation='relu')(x)
x = MaxPooling2D(3)(x)
x = BatchNormalization()(x)
x = Dropout(0.2)(x)
x = Conv2D(128, 3, activation='relu')(x)
x = GlobalMaxPooling2D()(x)
x = BatchNormalization()(x)

audio_output = Dropout(0.2)(x)

# text layers
text_input = Input(shape=(33,), dtype='int32')

x = Embedding(len(word_index) + 1, 100, weights=[embedding_matrix], input_length=33, trainable=False)(text_input)
x = Conv1D(128, 5, activation='relu')(x)
x = MaxPooling1D(2)(x)
x = Dropout(0.3)(x)
x = Conv1D(128, 5, activation='relu')(x)
x = MaxPooling1D(2)(x)
x = Dropout(0.3)(x)
x = Conv1D(128, 5, activation='relu')(x)
x = GlobalMaxPooling1D()(x)

text_output = Dropout(0.3)(x)

# final prediction layers
x = Concatenate()([audio_output, text_output])
x = Dense(256, activation='relu')(x)
x = Dropout(0.2)(x)
x = Dense(NUM_CLASSES, activation='softmax')(x)

model = Model([audio_input, text_input], x)
model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=0.0005), metrics=['accuracy'])
model.summary()

model.fit(train_x, train_y, validation_data=(val_x, val_y), epochs=20, verbose=1)
print("Accuracy: %.2f" % model.evaluate(test_x), test_y)[1])

# base: 0.3417