In [18]:
import pandas as pd
import numpy as np
from transformers import RobertaTokenizer, TFRobertaForSequenceClassification
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_audio, ffmpeg_extract_subclip
import glob
import os
import shutil
import whisper
import tensorflow as tf

In [None]:
# Load the dataset and defining label2id
data_model = pd.read_csv("./data/5000_sampled.csv")
data_model.head()

In [20]:
# Load the saved ROBERTA model
label2id = {label: i for i, label in enumerate(data_model['emotion'].unique())}
model = TFRobertaForSequenceClassification.from_pretrained('model/roberta_multi_model_v6', num_labels=len(label2id))

All model checkpoint layers were used when initializing TFRobertaForSequenceClassification.

All the layers of TFRobertaForSequenceClassification were initialized from the model checkpoint at model/roberta_multi_model_v6.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaForSequenceClassification for predictions without further training.


In [21]:
data_filtered = pd.read_csv("../speach_to_text/data_robinson/Robinson_filtered.csv")
data_filtered.head()

Unnamed: 0,Episode,Start_time,End_time,Emotions
0,1,0,124.0,neutral
1,1,124,350.0,"Joy, Surprise, Anticipation"
2,1,350,383.0,neutral
3,1,383,443.0,neutral
4,1,443,504.0,neutral


In [22]:
def split_episodes_into_fragments(data_path="../speach_to_text/data_robinson/Robinson_filtered.csv", source_path="../speach_to_text/episodes/full_episodes/*", target_path="./data/episodes_cut/"):
    print("start of split_episodes_into_fragments")
    # Read the data
    data_filtered = pd.read_csv(data_path)
    
    # Create a directory for temporary clips
    os.makedirs("./data/temp_clips", exist_ok=True)
    
    for file_path in glob.glob(source_path):
        episode_number = int(os.path.basename(file_path)[8:10])
        temp_df = data_filtered[data_filtered['Episode'] == episode_number]
        temp_df = temp_df[['Start_time', 'End_time']]
        for row_index in range(temp_df.shape[0]):
            start = temp_df.iloc[row_index, temp_df.columns.get_loc('Start_time')]
            end = temp_df.iloc[row_index, temp_df.columns.get_loc('End_time')]
            clip_path = f"data/temp_clips/ep_{episode_number}_{start}_{end}.mov"
            final_path = f"data/episodes_cut/{os.path.basename(clip_path[:-4])}.mp3"
            ffmpeg_extract_subclip(file_path, start, end, clip_path)
            ffmpeg_extract_audio(clip_path, final_path)
    
    # Remove temporary directory
    shutil.rmtree("./data/temp_clips")
    print("end of split_episodes_into_fragments")

In [23]:
# Function to get the transcript from the fragments
def speach_to_text(data_filtered):
    print("start of speach_to_text")
    transcript = []

    for row_index in range(len(data_filtered)):
        episode_number = data_filtered.iloc[row_index, 0]
        start = data_filtered.iloc[row_index, 1]
        end = data_filtered.iloc[row_index, 2]

        clip_path = f"data/episodes_cut/ep_{episode_number}_{start}_{end}.mp3"

        # Load the Whisper model and transcribe the audio file
        model = whisper.load_model("base")
        result = model.transcribe(clip_path, fp16=False, language="en")
        transcript.append(result["text"])

    print("end of speach_to_text")
    return transcript

In [24]:
def save_transcript(data_filtered, transcript, output_path='./data/transcript.csv'):
    print("start of save_transcript")
    # Merge the transcript with data_filtered
    transcript_df = pd.DataFrame({
        'Episode Number': data_filtered['Episode'],
        'Start': data_filtered['Start_time'],
        'End': data_filtered['End_time'],
        'Emotions': data_filtered['Emotions'],
        'Transcript': transcript
    })
    
    # Write the merged DataFrame to a CSV file
    transcript_df.to_csv(output_path, index=False)
    
    print("end of save_transcript")
    return transcript_df

In [25]:
label_mapping = {
    'anger': 0,
    'disgust': 1,
    'fear': 2,
    'happiness': 3,
    'sadness': 4,
    'surprise': 5
}




def predict_emotions(model, data_filtered_transcript):
    print("start of predict_emotions")
    # model.eval()

    # Clean the text data
    data_filtered_transcript = data_filtered_transcript.fillna(" ")

    # Tokenize the input texts for Kaggle test set
    tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
    # inputs = tokenizer(data_filtered_transcript['Transcript'].tolist(), truncation=True, padding=True)
    inputs = tokenizer(data_filtered_transcript['Transcript'].tolist(), padding=True, truncation=True, return_tensors="tf")
    
    batch_size = 32
    num_samples = len(inputs['input_ids'])

    # Create an empty list to store the predictions
    predictions = []

    # Iterate over the input samples in batches
    for i in range(0, num_samples, batch_size):
        # Get the batch inputs
        batch_inputs = {key: value[i:i+batch_size] for key, value in inputs.items()}
        
        # Perform inference on the batch
        batch_outputs = model(batch_inputs)
        
        # Get the batch predictions
        batch_predictions = tf.argmax(batch_outputs.logits, axis=1)
        
        # Append the batch predictions to the list
        predictions.extend(batch_predictions.numpy())

    # Convert the predictions to a DataFrame column
    predictions_df = pd.DataFrame({'prediction': predictions})

    # Map the predictions to the original labels
    predictions = predictions_df['prediction'].map({v: k for k, v in label_mapping.items()})

    # # Assign the predictions back to the original DataFrame
    # test_df['prediction'] = predictions_df['prediction']


    # # Map the predictions to the original labels
    # test_df['prediction'] = test_df['prediction'].map({v: k for k, v in label_mapping.items()})

    # Create Dataframe for submission
    submission_df = pd.DataFrame({
        'Episode Number': data_filtered_transcript['Episode Number'],
        'Start': data_filtered_transcript['Start'],
        'End': data_filtered_transcript['End'],
        'Emotions': data_filtered_transcript['Emotions'],
        'Transcript': data_filtered_transcript['Transcript'],
        'Predicted Emotions': predictions
    })
    print("end of predict_emotions")
    
    return submission_df

In [26]:
# Define the pipeline function
def pipeline(data_path="../speach_to_text/data_robinson/Robinson_filtered.csv", source_path="../speach_to_text/episodes/full_episodes/*", target_path="./data/episodes_cut/", output_path='data/transcript.csv'):

    print("start of pipeline")
    # Split episodes into fragments
    if len(os.listdir('data/episodes_cut/')) == 0:
        split_episodes_into_fragments()
    
   
    if os.path.exists(output_path):
        transcript_df = pd.read_csv(output_path)
    else:
        data_filtered = pd.read_csv(data_path)
        transcript = speach_to_text(data_filtered) # Get transcript from fragments
        transcript_df = save_transcript(data_filtered, transcript, output_path) # Save transcript
    
    # Predict emotions
    submission_df = predict_emotions(model, transcript_df)
    print("end of pipeline")
    return submission_df

In [None]:
# Call the pipeline function to execute the entire process
result_df = pipeline()
result_df.head()