# Author: Soham Bhokare 
# Class:  CS 544
# Research

Imports

In [21]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision as tv
from torch.utils.data import Dataset, DataLoader
from torchaudio.datasets.iemocap import IEMOCAP

import matplotlib.pyplot as plt
import numpy as np
import math

import os

import librosa
import pandas as pd
from tqdm import tqdm

from transformers import Wav2Vec2ForAudioFrameClassification, Wav2Vec2ForCTC, Wav2Vec2Tokenizer, Wav2Vec2Processor, pipeline
import IPython.display as display
import soundfile as sf
import speech_recognition as sr

import copy

import logging

logging.basicConfig(filename='debug.log', encoding='utf-8', level=logging.NOTSET)

In [3]:
import time

Constants

In [None]:
RATE = 16000
MODEL_NAME = "facebook/wav2vec2-base-960h"

Dataset traversal functionss

In [4]:
def get_session_for_file_name(id:str):
    if "Ses01" in id:
        return "Session1"
    elif "Ses02" in id:
        return "Session2"
    elif "Ses03" in id:
        return "Session3"
    elif "Ses04" in id:
        return "Session4"
    else:
        return "Session5"
    
def get_session_paths(main_dir):

    files = os.listdir(main_dir)
    files = [f for f in files if str(f).startswith("Session")]
    file_paths = []
    for file in files:
        file_paths.append(main_dir + file)

    return file_paths, files

def get_dialog_paths(session_dir, script = True):

    files = os.listdir(session_dir + "/dialog/wav")

    files = [f for f in files if str(f).startswith("Ses")]
    if script:
        files = [f for f in files if("_script") in f]
    else:
        files = [f for f in files if("_impro") in f]

    file_paths = []
    for file in files:
        file_paths.append(session_dir + file)

    return file_paths, files


def get_sentence_paths(session_dir, script = True):


    dirs = os.listdir(session_dir + "/sentences/wav")
    dirs = [dir for dir in dirs if str(dir).startswith("Ses")]

    if script:
        dirs = [dir for dir in dirs if ("_script") in dir]
    else:
        dirs = [dir for dir in dirs if ("_improv") in dir]

    return

Extract data from dataset

In [None]:
filename = "../dataset/IEMOCAP_full_release/"
filename_gz = "../dataset/IEMOCAP_full_release.tar.gz"

In [None]:
iemocap_csv1 = pd.read_csv("../dataset/IEMOCAP_Full.csv")
iemocap_csv1 = iemocap_csv1[iemocap_csv1.Emotion!="xxx"]

iemocap_csv2 = pd.read_csv("../dataset/iemocap_full_dataset.csv")
iemocap_csv2 = iemocap_csv2[iemocap_csv2.emotion!="xxx"]


In [None]:
iemocap_csv1.head()

Unnamed: 0.1,Unnamed: 0,ID,Timestamp,Text,Emotion,Valence,Activation,Dominance
0,0,Ses01F_impro01_F000,[6.2901 - 8.2357],Excuse me.,neu,2.5,2.5,2.5
1,1,Ses01F_impro01_F001,[10.0100 - 11.3925],Yeah.,neu,2.5,2.5,2.5
2,2,Ses01F_impro01_F002,[14.8872 - 18.0175],Is there a problem?,neu,2.5,2.5,2.5
5,5,Ses01F_impro01_F005,[27.4600 - 31.4900],Well what's the problem? Let me change it.,neu,2.5,3.5,2.0
6,6,Ses01F_impro01_F006,[38.9650 - 43.5900],What? I'm getting an ID. This is why I'm her...,fru,2.0,3.5,3.5


In [None]:
iemocap_csv2.head()

Unnamed: 0,session,method,gender,emotion,n_annotators,agreement,path
0,1,script,F,neu,3,3,Session1/sentences/wav/Ses01F_script02_1/Ses01...
1,1,script,F,fru,3,2,Session1/sentences/wav/Ses01F_script02_1/Ses01...
3,1,script,F,sur,3,2,Session1/sentences/wav/Ses01F_script02_1/Ses01...
4,1,script,F,neu,3,2,Session1/sentences/wav/Ses01F_script02_1/Ses01...
6,1,script,F,ang,3,2,Session1/sentences/wav/Ses01F_script02_1/Ses01...


In [26]:
def get_device():
 
    dev = 'cpu'
    device = torch.device(dev)
    return device

Audio splitting Class

In [23]:
class AudioFrameRecognizer():

    def __init__(self, model_name = "facebook/wav2vec2-base-960h", increment_rate = 0.1, overlap_frame = 4, sample_rate = 16000):
        self.device = get_device()
        self.model_name = model_name
        self.model = Wav2Vec2ForCTC.from_pretrained(model_name).to(self.device)
        self.tokenizer = Wav2Vec2Tokenizer.from_pretrained(model_name)
        self.increment_rate = increment_rate
        self.overlap_frame = overlap_frame
        self.sample_rate = sample_rate

        return


    def generate_audio_frames(self, audio, text):
        '''
        Audio: Librosa output
        Text: String
        '''

        default_all = True

        increments = self.sample_rate * self.increment_rate
        overlap = self.overlap_frame

        segments = []

        text = clean_text_string(text)

        text_split = text.split(" ")

        for word in text_split:
            if len(segments) != 0:
                start_t = segments[-1][1]
                start_t = int(max(0,(start_t - (overlap * increments))))
            else:
                start_t = 0

            end_t = self.split_audio(audio, word, max(0,start_t) , increments)

            if end_t == -1:
                if default_all:
                    logging.warning("Match not found for - " + str(word))
                    segments = self.default_audio_frames(audio, text)
                    segments = self.add_overlap_end(segments, audio_len = len(audio), addition_rate = 3)
                    segments = self.add_overlap_start(segments, addition_rate = 3)
                    return text_split, segments 
                
                else:
                    logging.warning("Match not found for - " + str(word))
                    logging.warning("Trying for next word -------")
                    end_t = self.default_audio_frame_word(audio, text, word, max(0,start_t))

            segments.append([start_t, end_t])

        if len(segments) == len(text_split):

            segments = self.remove_overlap_start(segments)
            segments = self.add_overlap_end(segments, audio_len = len(audio))

            return text_split, segments
        
        else:
            segments = self.default_audio_frames(audio, text)
            segments = self.add_overlap_end(segments, audio_len = len(audio), addition_rate=1)
            segments = self.add_overlap_start(segments, addition_rate=1)
            return text_split, segments 
    

    def split_audio(self, audio, word, start_time, increments):
    
        i = 0

        for i in range(int(start_time + increments), len(audio), int(increments)):

            segment = audio[int(start_time) : int(i)]

            word_recog = self.recognize_audio_segment(segment)
            
            recog_full = False

            # To check for entire word
            if recog_full:
                word_recog = word_recog.lower()
                word_recog = clean_text_string(word_recog)
                word_recog = word_recog.split(" ")
                if word.lower() in word_recog:
                    # print(word_recog)
                    return i

            else:
                word_recog = word_recog.lower()
                word_recog = clean_text_string(word_recog)
                if word.lower() in word_recog:
                    # print(word_recog)
                    return i
                
        return -1 


    def recognize_audio_segment(self, audio):

        input_values = self.tokenizer(audio, return_tensors = 'pt').input_values

        input_values = input_values.to(self.device)

        output = self.model(input_values)
        logits = output.logits
        predicted_ids = torch.argmax(logits, dim = -1)
        transcriptions = self.tokenizer.decode(predicted_ids[0])

        return transcriptions
    

    def remove_overlap_start(self, segments, correction_rate = 2.5):
        '''
        Segments: List
        '''
        segments_corr = []
        corr = (correction_rate * self.increment_rate * self.sample_rate)
        for segment in segments:
            segment_corr = copy.deepcopy(segment)
            if segment_corr[0] != 0:
                segment_corr[0] += corr
            
            segments_corr.append(segment_corr)

        return segments_corr
    

    def add_overlap_end(self, segments, audio_len, addition_rate = 0.25):
        '''
        Segments: From previous Step
        '''
        segments_corr = []
        corr = (addition_rate * self.increment_rate * self.sample_rate)
        for segment in segments:
            segment_corr = copy.deepcopy(segment)
            if segment_corr[0] != 0:
                segment_corr[1] += corr
                segment_corr[1] = min(segment_corr[1], audio_len)
            
            segments_corr.append(segment_corr)

        return segments_corr
    

    def add_overlap_start(self, segments, addition_rate = 0.25):
        '''
        Segments: From previous Step
        '''
        segments_corr = []
        corr = (addition_rate * self.increment_rate * self.sample_rate)
        for segment in segments:
            segment_corr = copy.deepcopy(segment)
            if segment_corr[0] != 0:
                segment_corr[0] -= corr
                segment_corr[0] = max(0, segment_corr[0])
            
            segments_corr.append(segment_corr)

        return segments_corr
    

    def default_audio_frames(self, audio, text):

        segments = []

        text_split = text.split(" ")
        tot_char = 0
        for word in text_split:
            tot_char += len(word)

        audio_len = len(audio)
        interval = int(math.floor(audio_len / tot_char))
        
        end_time = audio_len
        for word_idx in range(len(text_split)-1, -1, -1):
            word = text_split[word_idx]
            start_time = end_time - (interval * len(word))
            segment = [start_time, end_time]
            segments.append(segment)
            end_time = start_time
        
        segments.reverse()
        segments[0][0] = 0 

        return segments
    

    def default_audio_frame_word(self, audio, text, word, start_time, overlap_rate = 0):

        text_split = text.split(" ")
        tot_char = 0
        for w in text_split:
            tot_char += len(w)

        audio_len = len(audio)
        interval = int(math.floor(audio_len / tot_char))
        
        end_time = start_time + (interval * len(word))
        end_time += overlap_rate * self.increment_rate * self.sample_rate
        end_time = min(end_time, len(audio))

        return end_time
    

def clean_text_string(text):
    
    whitelist = set('abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ')
    clean_text = ''.join(filter(whitelist.__contains__, text))
    clean_text = clean_text.replace("  "," ")

    return clean_text
    

def segment_audio(audio, segments, text_data:list):
    '''
    Audio: Librosa Speech
    Segments: From previous step
    Output: List of librosa segments
    '''

    audio_segments = [] 
    for segment in segments:
        audio_segments.append(audio[ int(segment[0]) : int(segment[1])])
        
    assert len(audio_segments) == len(text_data)
    return audio_segments

Take example from dataset

In [None]:
example = 5

audio_ses = ""
audio_id1 = ""
audio_id2 = ""

match example: 
    case 1:
        audio_ses = "Session4"
        audio_id1 = "Ses04F_script01_1"
        audio_id2 = audio_id1 + "_" + "F001"

    case 2:
        audio_ses = "Session4"
        audio_id1 = "Ses04M_script03_2"
        audio_id2 = audio_id1 + "_" + "F052"

    case 3:
        audio_ses = "Session4"
        audio_id1 = "Ses04F_impro03"
        audio_id2 = audio_id1 + "_" + "F010"

    case 4:
        audio_ses = "Session4"
        audio_id1 = "Ses04M_script02_2"
        audio_id2 = audio_id1 + "_" + "F021"

    case 5:
        audio_ses = "Session5"
        audio_id1 = "Ses05M_script01_1b"
        audio_id2 = audio_id1 + "_" + "M002"


In [None]:

audio_file = "../dataset/IEMOCAP_full_release/Session4/sentences/wav/Ses04F_script01_1/Ses04F_script01_1_F001.wav"
audio_file = "../dataset/IEMOCAP_full_release/" + audio_ses + "/sentences/wav/" + audio_id1 + "/" + audio_id2 + ".wav"


speech, rate = librosa.load(audio_file, sr = RATE)
text_data = iemocap_csv1[iemocap_csv1["ID"] == audio_id2]["Text"].iloc[0]
print(text_data)

About four this morning.  I heard it crack and I looked out the window and he was standing right there when it cracked.


In [None]:
display.Audio(audio_file,autoplay=True)

Initialize Class

In [None]:
frame_recognizer = AudioFrameRecognizer(MODEL_NAME , overlap_frame = 4)

Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'Wav2Vec2CTCTokenizer'. 
The class this function is called from is 'Wav2Vec2Tokenizer'.


Extract Frames

In [None]:
text_segments, audio_segments = frame_recognizer.generate_audio_frames(speech, text_data)

Match not found for - About four this morning I heard it crack and I looked out the window and he was standing right there when it cracked


Check Frames

In [None]:
seg = 19
print(text_segments[seg])
audio_segmented = segment_audio(speech, audio_segments, text_segments)
display.Audio(audio_segmented[seg], autoplay=True, rate=16000)


there


## Final 

In [None]:
frame_recognizer = AudioFrameRecognizer()

Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'Wav2Vec2CTCTokenizer'. 
The class this function is called from is 'Wav2Vec2Tokenizer'.


In [None]:
text_segments, audio_segments = frame_recognizer.generate_audio_frames(speech, text_data)

Match not found for - About four this morning I heard it crack and I looked out the window and he was standing right there when it cracked


In [None]:
audio_segments = segment_audio(speech, audio_segments, text_segments)

# Extracting Data

In [6]:
df_train = pd.read_csv("./data/train.csv")

In [7]:
train_Ids = df_train["ID"].tolist()
train_sentences = df_train["Text"].tolist()
train_labels = df_train["Emotion"].tolist()

In [8]:
df_test = pd.read_csv("./data/test.csv")
test_ids = df_test["ID"].to_list()
test_sentences = df_test["Text"].to_list()
test_labels = df_train["ID"].tolist()

In [27]:
frame_recognizer = AudioFrameRecognizer()

Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'Wav2Vec2CTCTokenizer'. 
The class this function is called from is 'Wav2Vec2Tokenizer'.


In [12]:
def get_session_for_file_name(id:str):
    if "Ses01" in id:
        return "Session1"
    elif "Ses02" in id:
        return "Session2"
    elif "Ses03" in id:
        return "Session3"
    elif "Ses04" in id:
        return "Session4"
    else:
        return "Session5"

In [13]:
def get_file_path_for_id(id:str):
    splits = id.split("_")
    inside_directory =  "_".join(splits[:-1])
    return f"./IEMOCAP_full_release/{get_session_for_file_name(id)}/sentences/wav/{inside_directory}/{id}.wav"

In [None]:
final_data_train = {}

for id,sentence,label in tqdm(zip(train_Ids,train_sentences,train_labels),total=len(train_Ids)):
    speech,sr = librosa.load(get_file_path_for_id(id))
    text_segments, audio_segments = frame_recognizer.generate_audio_frames(speech, sentence)
    audio_segments = segment_audio(speech, audio_segments, text_segments)

    final_data_train[id] = {"segments":audio_segments,"text":text_segments,"label":label}

In [30]:
import pickle

In [None]:
with open('final_data_train_concat.pickle', 'wb') as f:
    # Load the data from the file using pickle.load()
    pickle.dump(final_data_train,f)

In [None]:
final_data_test = {}

for id,sentence,label in tqdm(zip(test_ids,test_sentences,test_labels),total=len(test_ids)):
    speech,sr = librosa.load(get_file_path_for_id(id))
    text_segments, audio_segments = frame_recognizer.generate_audio_frames(speech, sentence)
    audio_segments = segment_audio(speech, audio_segments, text_segments)

    final_data_test[id] = {"segments":audio_segments,"text":text_segments,"label":label}

In [None]:
with open('final_data_test_concat.pickle', 'wb') as f:
    # Load the data from the file using pickle.load()
    pickle.dump(final_data_test,f)

In [None]:
import torch.nn as nn

class BiLSTMTagger(nn.Module):
    def __init__(self, vocab_size, tagset_size, embedding_dim, hidden_dim, num_layers):
        super(BiLSTMTagger, self).__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim,padding_idx=vocabulary["<PAD>"])

        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, bidirectional=True,batch_first=True,dropout=0.33)

        self.dropout = nn.Dropout()


        self.linear1 = nn.Linear(hidden_dim * 2, 128)
        self.elu = nn.ELU()

        self.output = nn.Linear(128,tagset_size)


    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)


        lstm_out, _ = self.lstm(embeds)

        x = lstm_out

        x   = self.linear1(x)

        x = self.elu(x)
        x = self.output(x)

        return x