In [None]:
import numpy as np 

import pandas as pd 

import matplotlib.pyplot as plt 

import random

import os

In [None]:
train_df = pd.read_csv('/kaggle/input/PES-ml-hack-link1/train.csv')

# Define path to video clips

video_dir = '/kaggle/input/PES-ml-hack-link1/train_videos'





# Function to get video file path from IDs

def get_video_clip_path(row):

    dialogue_id = row['Dialogue_ID']

    utterance_id = row['Utterance_ID']

    filename = f"dia{dialogue_id}_utt{utterance_id}.mp4"

    return os.path.join(video_dir, filename)



# Apply the function to get file paths for each sampled clip

train_df['video_clip_path'] = train_df.apply(get_video_clip_path, axis=1)



# Check sample paths

print(train_df[['Dialogue_ID', 'Utterance_ID', 'video_clip_path']].head())

In [None]:
train_df.shape

In [None]:
# Define path to video clips

test_df = pd.read_csv('/kaggle/input/PES-ml-hack-link1/test.csv', encoding = 'cp1252')

video_dir = '/kaggle/input/PES-ml-hack-link1/test_videos'





# Function to get video file path from IDs

def get_video_clip_path(row):

    dialogue_id = row['Dialogue_ID']

    utterance_id = row['Utterance_ID']

    filename = f"dia{dialogue_id}_utt{utterance_id}.mp4"

    return os.path.join(video_dir, filename)



# Apply the function to get file paths for each sampled clip

test_df['video_clip_path'] = test_df.apply(get_video_clip_path, axis=1)



# Check sample paths

print(test_df[['Dialogue_ID', 'Utterance_ID', 'video_clip_path']].head())

# Emotion recognition from a video

In [None]:
test_df

In [None]:
# Ensure StartTime and EndTime are in a datetime or timedelta format
train_df['StartTime'] = pd.to_datetime(train_df['StartTime'])
train_df['EndTime'] = pd.to_datetime(train_df['EndTime'])

# Calculate duration as the difference between EndTime and StartTime
train_df['duration'] = (train_df['EndTime'] - train_df['StartTime']).dt.total_seconds()

# Optionally, drop the StartTime and EndTime columns if you no longer need them
train_df.drop(columns=['StartTime', 'EndTime'], inplace=True)

print(train_df.head())


In [None]:
# Step 1: Create a count of each emotion per speaker
emotion_profile = train_df.groupby(['Speaker', 'Emotion']).size().unstack(fill_value=0)

# Step 2: Calculate the frequency (or proportion) of each emotion per speaker
emotion_profile = emotion_profile.div(emotion_profile.sum(axis=1), axis=0)

print(emotion_profile.head(n=20))


In [None]:
# Merge the emotional profile back to the main DataFrame based on Speaker
train_df = train_df.merge(emotion_profile, on='Speaker', how='left', suffixes=('', '_profile'))
# Drop the specified emotion columns
# train_df.drop(columns=['anger', 'joy', 'neutral', 'sadness', 'surprise'], inplace=True)
train_df.head()



In [None]:
# For each row, find the emotion with the highest probability in the profile columns
emotion_profile_columns = ['anger', 'joy', 'neutral', 'sadness', 'surprise']
train_df['predicted_emotion'] = train_df[emotion_profile_columns].idxmax(axis=1)

# Check how often the highest probability emotion matches the actual emotion
accuracy = (train_df['predicted_emotion'] == train_df['Emotion']).mean()

print(f"Percentage of times the highest probability matches the actual emotion: {accuracy * 100:.2f}%")


In [None]:
# Emotion distribution by Season
import seaborn as sns

plt.figure(figsize=(10, 6))
sns.countplot(x='Season', hue='Emotion', data=train_df)
plt.title('Emotion Distribution by Season')
plt.show()


In [None]:
# Boxplot for duration distribution by emotion
plt.figure(figsize=(10, 6))
sns.boxplot(x='Emotion', y='duration', data=train_df)
plt.title('Duration Distribution by Emotion')
plt.show()


# Text/ Utterance Model

In [None]:
import pandas as pd
import torch
from transformers import BertTokenizer, BertModel
import numpy as np

# Load the dataset
train_data = train_df  # Assuming you already have the train_df loaded

# Load the pre-trained BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenization function
def tokenize_function(text):
    return tokenizer(text, padding='max_length', truncation=True, max_length=128, return_tensors="pt")

# Tokenize the 'Utterance' column
tokenized_data = train_data['Utterance'].apply(tokenize_function)

# Extract input_ids and attention_mask
input_ids = torch.cat([x['input_ids'] for x in tokenized_data], dim=0)
attention_mask = torch.cat([x['attention_mask'] for x in tokenized_data], dim=0)

# Map the labels to numeric values
emotion_map = {'anger': 0, 'joy': 1, 'neutral': 2, 'sadness': 3, 'surprise': 4}
labels_text = train_data['Emotion'].map(emotion_map).values

# Check if CUDA is available and select the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the pre-trained BERT model and move to the appropriate device
bert_model = BertModel.from_pretrained('bert-base-uncased').to(device)

# Wrap the model with DataParallel to utilize multiple GPUs
bert_model = torch.nn.DataParallel(bert_model)

# Feature extraction function
def extract_bert_features(input_ids, attention_mask):
    input_ids = input_ids.to(device)
    attention_mask = attention_mask.to(device)
    
    with torch.no_grad():
        outputs = bert_model(input_ids, attention_mask=attention_mask)
        last_hidden_states = outputs.last_hidden_state
        cls_embeddings = last_hidden_states[:, 0, :]  # [CLS] token embeddings
    return cls_embeddings

# Extract features for the entire dataset
features = extract_bert_features(input_ids, attention_mask)

# Convert features to numpy arrays for further processing
features_text = features.cpu().numpy()  # Move to CPU before converting to numpy

# Output the features and labels
print("Features shape:", features_text.shape)
print("Labels shape:", labels_text.shape)


In [None]:
features_text

In [None]:
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.neural_network import MLPClassifier
X_train_features = features_text  # BERT embeddings as features
y_train = labels_text  # Corresponding labels

# Initialize and train the MLP classifier
mlp_model = MLPClassifier(hidden_layer_sizes=(256, 128), max_iter=1000, activation='relu', 
                          early_stopping=True, learning_rate_init=0.001, solver='adam', random_state=42)
mlp_model.fit(X_train_features, y_train)

# If you have a validation set, use it for evaluation
# For demonstration purposes, we'll assume the same training set is used for validation
# If you have separate validation data, replace `X_train_features` and `y_train` with `X_val_features` and `y_val` respectively

# Make predictions on the training set (or validation set)
y_pred_mlp = mlp_model.predict(X_train_features)  # Use X_val_features for validation data if applicable

# Evaluate the model
print("MLP Accuracy:", accuracy_score(y_train, y_pred_mlp))  # Use y_val for validation labels if applicable
print("MLP Classification Report:\n", classification_report(y_train, y_pred_mlp))  # Use

# AUDIO Model 

In [None]:
!pip install moviepy

In [None]:
import moviepy.editor as mp
from pydub import AudioSegment
import numpy as np
import librosa

def extract_audio_features_from_video(video_path):
    # Load the video file using moviepy
    video_clip = mp.VideoFileClip(video_path)
    
    # Extract the audio from the video clip and save it as a temporary in-memory audio file (wav format)
    audio = video_clip.audio
    audio_path = "/tmp/temp_audio.wav"  # Temporary file for the audio
    
    # Write the audio to the temporary file
    audio.write_audiofile(audio_path, codec='pcm_s16le')

    # Now, use pydub to load the audio from the temporary wav file
    audio_segment = AudioSegment.from_wav(audio_path)

    # Convert the audio to numpy array (for librosa compatibility)
    samples = np.array(audio_segment.get_array_of_samples())
    
    # If the audio is stereo, take the mean of both channels to convert to mono
    if audio_segment.channels > 1:
        samples = samples.reshape((-1, audio_segment.channels)).mean(axis=1)
    
    # Extract audio features using librosa
    sample_rate = audio_segment.frame_rate  # Get the sample rate from the audio file
    mfccs = np.mean(librosa.feature.mfcc(y=samples, sr=sample_rate, n_mfcc=13).T, axis=0)
    chroma = np.mean(librosa.feature.chroma_stft(y=samples, sr=sample_rate).T, axis=0)
    mel = np.mean(librosa.feature.melspectrogram(y=samples, sr=sample_rate).T, axis=0)
    contrast = np.mean(librosa.feature.spectral_contrast(y=samples, sr=sample_rate).T, axis=0)
    tonnetz = np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(samples), sr=sample_rate).T, axis=0)

    # Combine all extracted features
    features = np.hstack([mfccs, chroma, mel, contrast, tonnetz])
    
    return features

In [None]:
from joblib import Parallel, delayed

def extract_features_for_row(row):
    video_path = row['video_clip_path']
    return extract_audio_features_from_video(video_path)

def extract_features_in_parallel(df):
    results = Parallel(n_jobs=-1)(delayed(extract_features_for_row)(row) for _, row in df.iterrows())
    
    features = []
    labels = []
    for i, result in enumerate(results):
        features.append(result)
        labels.append(df.iloc[i]['Emotion'])
    
    return np.array(features), np.array(labels)

# Usage example:
features_all, labels_all = extract_features_in_parallel(train_df)

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Split data into training and validation sets (e.g., 80% train, 20% validation)
X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(features_all, labels_all, test_size=0.2, random_state=42)

# Train a Random Forest classifier
classifier = RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced')
classifier.fit(X_train_split, y_train_split)

# Make predictions on the validation set
y_pred = classifier.predict(X_val_split)

# Evaluate accuracy
accuracy = accuracy_score(y_val_split, y_pred)
print(f"Validation Accuracy: {accuracy * 100:.2f}%")

## Video model

In [None]:
import numpy as np
import pandas as pd
import os
import random
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report
import cv2
from imblearn.over_sampling import SMOTE
# Video feature extraction function (Increase number of frames)
def extract_video_features(video_path, num_frames=10):  # Increased number of frames
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_indices = np.linspace(0, frame_count - 1, num_frames, dtype=int)

    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            # Resize the frame for uniform feature extraction
            frame = cv2.resize(frame, (64, 64))  # e.g., 64x64
            frames.append(frame)
    cap.release()

    # Flatten and stack frames to a single vector
    return np.array(frames).flatten()

# Extract video features for train and test datasets
train_video_features = np.array([extract_video_features(path) for path in train_df['video_clip_path']])
test_video_features = np.array([extract_video_features(path) for path in test_df['video_clip_path']])


In [None]:
train_video_features.shape

In [None]:
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.neural_network import MLPClassifier
X_train_features = train_video_features  # BERT embeddings as features
y_train = labels_text  # Corresponding labels

# Initialize and train the MLP classifier
mlp_model_vid = MLPClassifier(hidden_layer_sizes=(256, 128), max_iter=1000, activation='relu', 
                          early_stopping=True, learning_rate_init=0.001, solver='adam', random_state=42)
mlp_model_vid.fit(X_train_features, y_train)

# If you have a validation set, use it for evaluation
# For demonstration purposes, we'll assume the same training set is used for validation
# If you have separate validation data, replace `X_train_features` and `y_train` with `X_val_features` and `y_val` respectively

# Make predictions on the training set (or validation set)
y_pred_mlp = mlp_model_vid.predict(X_train_features)  # Use X_val_features for validation data if applicable

# Evaluate the model
print("MLP Accuracy:", accuracy_score(y_train, y_pred_mlp))  # Use y_val for validation labels if applicable
print("MLP Classification Report:\n", classification_report(y_train, y_pred_mlp))  # Use

# Fusion


In [None]:
print("Audio features shape:", features_all.shape)
print("BERT features shape:", X_train_features.shape)
print("Video features shape:", train_video_features.shape)



In [None]:
# Assuming you already have the test_df loaded and the classifier trained (for example, a RandomForest, SVM, or MLP)
test_data = test_df
# Tokenize the 'Utterance' column of the test dataset
tokenized_test_data = test_data['Utterance'].apply(tokenize_function)

# Extract input_ids and attention_mask for the test data
input_ids_test = torch.cat([x['input_ids'] for x in tokenized_test_data], dim=0)
attention_mask_test = torch.cat([x['attention_mask'] for x in tokenized_test_data], dim=0)

# Extract features for the test dataset
features_test = extract_bert_features(input_ids_test, attention_mask_test)

# Convert features to numpy arrays for further processing (if required)
features_test_text = features_test.cpu().numpy()  # Move to CPU before converting to numpy

# # If you have a trained classifier, use it to make predictions on the extracted features
# y_test_pred = trained_classifier.predict(features_test_text)  # Assuming 'trained_classifier' is your trained model

# # Now you can create a DataFrame with the predictions (if you need to submit or save them)
# test_data['Predicted_Emotion'] = y_test_pred

# # Save the predictions to a CSV file (optional)
# test_data[['Utterance', 'Predicted_Emotion']].to_csv('predictions.csv', index=False)

# print("Predictions saved to predictions.csv.")


In [None]:
features_test_text.shape

In [None]:
from joblib import Parallel, delayed
import numpy as np

def extract_features_for_row(row):
    video_path = row['video_clip_path']
    return extract_audio_features_from_video(video_path)

def extract_features_in_parallel(df):
    # Parallel processing for extracting features from each video
    results = Parallel(n_jobs=-1)(delayed(extract_features_for_row)(row) for _, row in df.iterrows())
    
    features = []
    # No need to extract labels since test_df doesn't have 'Emotion' column
    for result in results:
        if result is not None:  # Skip invalid results
            features.append(result)
    
    return np.array(features)

# Usage example for test data:
features_audio = extract_features_in_parallel(test_df)

print(features_audio.shape)  # Check the shape of the extracted features


In [None]:
features_audio.shape

In [None]:
# Move the tensor from GPU to CPU before passing it to the classifier
# features_test_cpu = features_a.cpu().numpy()  # Move to CPU and convert to numpy array
# features_test_cpu.shape
y_pred_audio = classifier.predict(features_audio)
y_pred_mlp = mlp_model.predict(features_text)
y_pred_vid = mlp_model_vid.predict(test_video_features)

In [None]:
y_pred_proba_audio = classifier.predict_proba(features_audio)
y_pred_proba_text = mlp_model.predict_proba(features_test_text)
y_pred_proba_vid = mlp_model_vid.predict_proba(test_video_features)

In [None]:

# Weighted average of probabilities (set weights as needed)
combined_probs = 0.25 * y_pred_proba_audio + 0.7 * y_pred_proba_text + 0.05 * y_pred_proba_vid  # Adjust weights based on model trust

# Get final predictions from combined probabilities
final_predictions = np.argmax(combined_probs, axis=1)
y_val = np.array(y_train, dtype=str)  # or dtype=int based on your data
final_predictions = np.array(final_predictions, dtype=str)  # or dtype=int

# # Evaluate the model
# print("Weighted Fusion Model Accuracy:", accuracy_score(y_val, final_predictions))
# print("Weighted Fusion Model Classification Report:\n", classification_report(y_val, final_predictions))


In [None]:
final_predictions

In [None]:
# Emotion to integer mapping
emotion_map = {'0':'anger', '1':'joy', '2':'neutral', '3':'sadness', '4':'surprise'}

# Convert y_val and y_pred to integer labels
# y_val_int = np.array([emotion_map[emotion] for emotion in y_val])
y_pred_int = np.array([emotion_map[emotion] for emotion in final_predictions])
y_pred_int

In [None]:
all_preds = y_pred_int

all_ids = test_df["Sr No."]

submission_df = pd.DataFrame({

        'Sr No.': all_ids,

        'Emotion': all_preds

    })

    

# Save the DataFrame to CSV

submission_df.to_csv("submission.csv", index=False)