In [None]:
import numpy as np
import pandas as pd
import librosa
import os
from tqdm import tqdm
from transformers import BertTokenizer, BertModel
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from warnings import simplefilter
from sklearn.metrics import f1_score
from sklearn.utils import resample
from sklearn.feature_extraction.text import TfidfVectorizer

In [None]:
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

In [None]:
data_train = pd.read_csv('Data_Train_modified.csv')
data_val = pd.read_csv('Data_Val_modified.csv')
data_test = pd.read_csv('Data_Test_original.csv')

In [None]:
tqdm.pandas()
data_train['sentiment'] = data_train['sentiment'].progress_apply(lambda x: 'positive' if x > 0 else 'negative' if x < 0 else 'neutaral')
data_val['sentiment'] = data_val['sentiment'].progress_apply(lambda x: 'positive' if x > 0 else 'negative' if x < 0 else 'neutaral')
data_test['sentiment'] = data_test['sentiment'].progress_apply(lambda x: 'positive' if x > 0 else 'negative' if x < 0 else 'neutaral')

In [None]:
data_test["sentiment"].value_counts()

In [None]:
neutral_count = data_train[data_train["sentiment"] == "neutaral"].shape[0]
data_positive = data_train[data_train["sentiment"] == "positive"]
data_negative = data_train[data_train["sentiment"] == "negative"]

data_positive_downsampled = resample(data_positive, replace=False, n_samples=neutral_count, random_state=42)
data_negative_downsampled = resample(data_negative, replace=False, n_samples=neutral_count, random_state=42)
data_train = pd.concat([data_train[data_train["sentiment"] == "neutaral"], data_positive_downsampled, data_negative_downsampled])

In [None]:
data_train = data_train.reset_index().drop(columns=["index"])
data_val = data_val.reset_index().drop(columns=["index"])
data_test = data_test.reset_index().drop(columns=["index"])

data_train = data_train.rename(columns={"video": "audio_file"})
data_val = data_val.rename(columns={"video": "audio_file"})
data_test = data_test.rename(columns={"video": "audio_file"})

In [None]:
audio_folder = "./Audio/WAV_16000/"
SAMPLE_RATE = 16000
HOP_LENGTH = 512
N_MFCC= 13
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = BertModel.from_pretrained("bert-base-uncased").to(DEVICE)

In [None]:
def extract_mfcc(file_path, start_time, end_time, n_mfcc):
    y, sr = librosa.load(file_path, sr=None)
    start_sample = int(start_time * sr)
    end_sample = int(end_time * sr)
    y_segment = y[start_sample:end_sample]
        
    mfcc = librosa.feature.mfcc(y=y_segment, sr=sr, n_mfcc=n_mfcc)
    mfcc_mean = np.mean(mfcc.T, axis=0)   
    return mfcc_mean.flatten()

def extract_pitch(file_path, start_time, end_time):
    y, sr = librosa.load(file_path, sr=None)
    start_sample = int(start_time * sr)
    end_sample = int(end_time * sr)
    y_segment = y[start_sample:end_sample]
    
    pitch, _ = librosa.core.piptrack(y=y_segment, sr=sr)
    return np.mean(pitch)

def extract_bert_features(text):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=600).to(DEVICE)
    with torch.no_grad():
        outputs = model(**inputs)
        
    return outputs.last_hidden_state.mean(dim=1).cpu().numpy()[0]

In [None]:
def get_mfcc(data, n_mfcc):
    data["mfcc"] = data.progress_apply(lambda row: extract_mfcc(os.path.join(audio_folder, row["audio_file"] + ".wav"), row["start_time"], row["end_time"], N_MFCC), axis=1)
    mfcc_columns = [f"mfcc_{i}" for i in range(n_mfcc)]
    data[mfcc_columns] = pd.DataFrame(data["mfcc"].tolist(), index=data.index)
    data.drop(columns=["mfcc"], inplace=True)
    return data

def get_pitch(data):
    data["pitch"] = data.progress_apply(lambda row: extract_pitch(os.path.join(audio_folder, row["audio_file"] + ".wav"), row["start_time"], row["end_time"]), axis=1)
    return data

def get_bert_embedings(data):
    simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

    data["bert_features"] = data["text"].progress_apply(lambda x: extract_bert_features(x))
    bert_columns = [f"bert_{i}" for i in range(768)]
    data[bert_columns] = pd.DataFrame(data["bert_features"].tolist(), index=data.index)
    data.drop(columns=["bert_features"], inplace=True)
    return data

def get_tfidf(vectorizer, data):
    tfidf = vectorizer.transform(data["ASR"])
    df_tfidf = pd.DataFrame(tfidf.toarray(), columns=vectorizer.get_feature_names_out())
    return pd.concat([data, df_tfidf], axis=1)

In [None]:
#data_train = get_mfcc(data_train, N_MFCC)
#data_val = get_mfcc(data_val, N_MFCC)
#data_test = get_mfcc(data_test, N_MFCC)

#data_train = get_pitch(data_train)
#data_val = get_pitch(data_val)
#data_test = get_pitch(data_test)

data_train = get_bert_embedings(data_train)
data_val = get_bert_embedings(data_val)
#data_test = get_bert_embedings(data_test)

vectorizer = TfidfVectorizer(max_features=1000)
vectorizer.fit(data_train["ASR"])
data_train = get_tfidf(vectorizer, data_train)
data_val = get_tfidf(vectorizer, data_val)
#data_test = get_tfidf(vectorizer, data_test)

In [None]:
data_train = pd.concat([data_train, pd.get_dummies(data_train['sentiment'])], axis=1)
data_val = pd.concat([data_val, pd.get_dummies(data_val['sentiment'])], axis=1)
data_test = pd.concat([data_test, pd.get_dummies(data_test['sentiment'])], axis=1)

In [None]:
X_train = data_train.drop(['audio_file', 'start_time', 'end_time', 'sentiment', 'happy', 'sad',	'anger', 'surprise', 'disgust',	'fear', 'text', 'ASR', 'negative', 'neutaral', 'positive'], axis=1)
y_train = data_train[['negative', 'neutaral', 'positive']]

X_val = data_val.drop(['audio_file', 'start_time', 'end_time', 'sentiment', 'happy', 'sad',	'anger', 'surprise', 'disgust',	'fear', 'text', 'ASR', 'negative', 'neutaral', 'positive'], axis=1)
y_val = data_val[['negative', 'neutaral', 'positive']]

X_test = data_test.drop(['audio_file', 'start_time', 'end_time', 'sentiment', 'happy', 'sad',	'anger', 'surprise', 'disgust',	'fear', 'text', 'ASR', 'negative', 'neutaral', 'positive'], axis=1)
y_test = data_test[['negative', 'neutaral', 'positive']]

In [None]:
class CustomDataset(Dataset):
    def __init__(self, inputs, targets, device):
        self.inputs = torch.tensor(inputs, dtype=torch.float32).to(device)
        self.targets = torch.tensor(targets, dtype=torch.float32).to(device)
    
    def __len__(self):
        return len(self.inputs)
    
    def __getitem__(self, index):
        return self.inputs[index], self.targets[index]

class DeepNeuralAudio(nn.Module):
    def __init__(self, input_shape, num_classes):
        super().__init__()
        self.linear1 = nn.Linear(input_shape[1], 1024)
        self.linear2 = nn.Linear(1024, 1024)
        self.linear3 = nn.Linear(1024, 256)
        self.linear4 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = F.relu(self.linear3(x))
        x = self.linear4(x)
        return x
    
class DeepNeural(nn.Module):
    def __init__(self, input_shape, num_classes):
        super().__init__()
        self.linear1 = nn.Linear(input_shape[1], 512)
        self.linear2 = nn.Linear(512, 512)
        self.linear3 = nn.Linear(512, 64)
        self.linear4 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = F.relu(self.linear3(x))
        x = self.linear4(x)
        return x

In [None]:
set_seed(42)

device = "cuda" if torch.cuda.is_available() else "cpu"

trainset = CustomDataset(X_train.to_numpy(), y_train.to_numpy(), device)
trainloader = DataLoader(trainset, batch_size=4096, shuffle=True)

valset = CustomDataset(X_val.to_numpy(), y_val.to_numpy(), device)
valloader = DataLoader(valset, batch_size=4096, shuffle=False)

testset = CustomDataset(X_test.to_numpy(), y_test.to_numpy(), device)
testloader = DataLoader(testset, batch_size=256, shuffle=False)

model = DeepNeural(X_train.shape, y_train.shape[1]).to(device)
optimizer = optim.AdamW(model.parameters(), lr=2e-5)
loss_function = nn.CrossEntropyLoss()

In [None]:
train_losses = []
val_losses = []

for epoch in range(160):
    model.train()
    train_loss = 0
    for data in trainloader:
        X, y = data
        output = model(X)
        loss = loss_function(output, y)
        train_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss /= len(trainloader)

    model.eval()
    val_loss = 0
    with torch.no_grad():
        for data in valloader:
            X, y = data
            output = model(X)
            val_loss += loss_function(output, y).item()
    
    val_loss /= len(valloader)

    print(f"Epoch {epoch+1}, Training Loss: {train_loss:.4f} , Validation Loss: {val_loss:.4f}")

In [None]:
def evaluate_f1(model, dataloader, device):
    model.eval()
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for data in valloader:
            X, y = data
            X = X.to(device)
            y = y.to(device)

            output = model(X)
            preds = torch.argmax(output, dim=1)
            
            all_preds.extend(preds.cpu().numpy())
            all_targets.extend(torch.argmax(y, dim=1).cpu().numpy())

        
        all_preds = np.array(all_preds).flatten()
        all_targets = np.array(all_targets).flatten()
        f1 = f1_score(all_targets, all_preds, average='weighted')

        print("F1-score:", f1)

evaluate_f1(model, valloader, device)