To use this file for testing, please upload files in the email as follows:-
1. [Audio Emotional Analysis] Upload 'audio_emotion_classifier.joblib' in content folder.
2. [Text Sentiment Analysis] Create a folder named 'sentiment_model' in content folder, extract and upload files in the sentiment_model folder.
3. [Contextual Coherence Model] Create a folder named 'coherence_model' in content folder, extract and upload files in coherence_model folder.
4. Upload testing file and rename it in main pipeline (ie dialogue1.txt)

To use this file for training:


1.   [Audio Emotional Analysis] remove the 'audio_emotion_classifier.joblib' file in the content folder.
2.   [Text  Sentiment Analysis] upload 'train.csv' (sentiment analysis dataset) in content folder, and change main pipeline (change load_model = False and run pipeline)
3.   [Contextual Coherence Analysis] upload 'dialogues_dataset.csv' (contextual coherence analysis dataset) in content folder, and change main pipeline (change load_model = False)

In [None]:
!pip install opensmile
!pip install --upgrade pandas
!pip install xgboost
!apt-get install git-lfs
!git lfs install
!git clone https://github.com/CheyneyComputerScience/CREMA-D.git

Collecting opensmile
  Downloading opensmile-2.5.0-py3-none-manylinux_2_17_x86_64.whl.metadata (15 kB)
Collecting audobject>=0.6.1 (from opensmile)
  Downloading audobject-0.7.11-py3-none-any.whl.metadata (2.6 kB)
Collecting audinterface>=0.7.0 (from opensmile)
  Downloading audinterface-1.2.2-py3-none-any.whl.metadata (4.1 kB)
Collecting audeer>=1.18.0 (from audinterface>=0.7.0->opensmile)
  Downloading audeer-2.2.0-py3-none-any.whl.metadata (4.1 kB)
Collecting audformat<2.0.0,>=1.0.1 (from audinterface>=0.7.0->opensmile)
  Downloading audformat-1.3.1-py3-none-any.whl.metadata (4.6 kB)
Collecting audiofile>=1.3.0 (from audinterface>=0.7.0->opensmile)
  Downloading audiofile-1.5.0-py3-none-any.whl.metadata (4.9 kB)
Collecting audmath>=1.4.1 (from audinterface>=0.7.0->opensmile)
  Downloading audmath-1.4.1-py3-none-any.whl.metadata (3.6 kB)
Collecting audresample<2.0.0,>=1.1.0 (from audinterface>=0.7.0->opensmile)
  Downloading audresample-1.3.3-py3-none-manylinux_2_17_x86_64.whl.metada

In [None]:
#For sentiment
!pip install transformers datasets torch scikit-learn

Collecting datasets
  Downloading datasets-3.0.0-py3-none-any.whl.metadata (19 kB)
Collecting pyarrow>=15.0.0 (from datasets)
  Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Downloading datasets-3.0.0-py3-none-any.whl (474 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m474.3/474.3 kB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl (39.9 MB)
[2K 

In [None]:
import os
import pandas as pd
import numpy as np
import opensmile
import audiofile
import joblib
import math
import torch
from xgboost import XGBClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_recall_fscore_support
from sklearn.model_selection import cross_val_score
from sklearn.exceptions import NotFittedError
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth, files
from oauth2client.client import GoogleCredentials
from transformers import BigBirdTokenizer, BigBirdForSequenceClassification, Trainer, TrainingArguments, DataCollatorWithPadding, AutoTokenizer
from datasets import Dataset
import torch.nn.functional as F

# Audio Processing For Emotional Analysis

In [None]:
# AudioProcessor: Handles loading of audio files
class AudioProcessor:
  """
  AudioProcessor handles loading of audio files.
  It extracts audio signals and sampling rates from audio files.
  """

  def __init__(self, file_paths):
    self.file_paths = file_paths  # List of audio file paths

  def load_audio(self, path):
    """
    Loads an audio file and returns the signal and sampling rate.
    """
    try:
      signal, sampling_rate = audiofile.read(path, always_2d=True)
    except Exception as e:
      print(f"Error loading {path}: {str(e)}")
      return None, None
    return signal, sampling_rate

  def batch_load(self):
    """
    Loads all audio files in batch.
    Returns a list of tuples containing the audio signals and sampling rates.
    """
    signals = []
    for path in self.file_paths:
      signal, sampling_rate = self.load_audio(path)
      if signal is not None:
        signals.append((signal, sampling_rate))
    return signals


# FeatureExtractor: Uses OpenSmile to extract features from audio
class FeatureExtractor:
  """
  Extracts features from audio files using OpenSmile.
  """

  def __init__(self):
    self.smile = opensmile.Smile(
        feature_set=opensmile.FeatureSet.eGeMAPSv02,
        feature_level=opensmile.FeatureLevel.Functionals
    )

  def extract_features(self, signal, sampling_rate):
    """
    Extracts features from a single audio signal using OpenSmile.
    """
    features = self.smile.process_signal(signal, sampling_rate)
    return features

  def extract_batch_from_paths(self, paths):
    """
    Extracts features from a list of audio file paths.
    """
    all_features = []
    for path in paths:
      signal, sampling_rate = audiofile.read(path, always_2d=True)
      if signal is not None:
        features = self.extract_features(signal, sampling_rate)
        all_features.append(features)
    return pd.concat(all_features, ignore_index=True)


# EmotionClassifier: XGBoost classifier with RandomizedSearchCV for hyperparameter tuning
class EmotionClassifier:
  """
  A classifier for predicting emotions using XGBoost with RandomizedSearchCV for faster hyperparameter tuning.
  """

  def __init__(self):
    self.model = XGBClassifier(random_state=42)
    self.label_encoder = LabelEncoder()
    self.scaler = StandardScaler()
    self.is_fitted = False

  def train(self, X_train, y_train):
    """
    Trains the emotion classifier using scaled features and encoded labels.
    Uses RandomizedSearchCV for more comprehensive hyperparameter tuning.
    """
    y_train_encoded = self.label_encoder.fit_transform(y_train)
    self.feature_names = X_train.columns
    X_train_scaled = self.scaler.fit_transform(X_train)

    # Define a hyperparameter grid
    param_distributions = {
        'n_estimators': [100, 200],
        'max_depth': [3, 5],
        'learning_rate': [0.01, 0.05, 0.1]
    }

    # Use RandomizedSearchCV with more iterations
    randomized_search = RandomizedSearchCV(
        estimator=self.model,
        param_distributions=param_distributions,
        n_iter=5,
        cv=3,
        scoring='accuracy',
        verbose=2,
        n_jobs=-1
    )
    randomized_search.fit(X_train_scaled, y_train_encoded)

    # Use the best model from RandomizedSearchCV
    self.model = randomized_search.best_estimator_
    self.is_fitted = True
    print(f"Best parameters found: {randomized_search.best_params_}")

    # Evaluate cross-validation scores
    cv_scores = cross_val_score(self.model, X_train_scaled, y_train_encoded, cv=3, scoring='accuracy')
    print(f"Cross-validation scores: {cv_scores}")
    print(f"Mean cross-validation score: {np.mean(cv_scores)}")

  def predict(self, X):
    """
    Predicts emotions on new data and returns a list of all possible PredictionResult objects.
    """
    if not self.is_fitted:
      raise NotFittedError("This EmotionClassifier instance is not fitted yet.")

    if not hasattr(self.label_encoder, 'classes_'):
      raise ValueError("LabelEncoder is not fitted yet.")

    X = X[self.feature_names]
    X_scaled = self.scaler.transform(X)
    y_proba = self.model.predict_proba(X_scaled)
    y_classes = self.label_encoder.classes_

    # Create a list of all emotions, levels, and their corresponding confidence scores
    all_predictions = []
    for i in range(len(X)):
      sorted_indices = np.argsort(-y_proba[i])  # Sort by probability in descending order
      predictions_for_sample = []
      for idx in sorted_indices:
        emotion = y_classes[idx]
        prob = y_proba[i][idx]
        predictions_for_sample.append(PredictionResult(emotion, prob))
      all_predictions.append(predictions_for_sample)

    return all_predictions

  def predict_top_label(self, X):
    """
    Predicts the top emotion label for each sample.
    """
    X = X[self.feature_names]
    X_scaled = self.scaler.transform(X)
    y_pred_encoded = self.model.predict(X_scaled)
    y_pred = self.label_encoder.inverse_transform(y_pred_encoded)
    return y_pred

  def save_model(self, filename):
    """
    Saves the trained model and scaler to a file.
    """
    model_data = {
        'model': self.model,
        'scaler': self.scaler,
        'label_encoder': self.label_encoder,
        'feature_names': self.feature_names
    }
    joblib.dump(model_data, filename)
    print(f"Model, scaler, label encoder, and feature names saved to {filename}")

  def load_model(self, filename):
    """
    Loads the model from a file if it exists.
    """
    if os.path.exists(filename):
      model_data = joblib.load(filename)
      self.model = model_data['model']
      self.scaler = model_data['scaler']
      self.label_encoder = model_data['label_encoder']
      self.feature_names = model_data['feature_names']
      self.is_fitted = True
      print("Model, scaler, label encoder, and feature names loaded successfully.")
    else:
      print("Model file not found. Training a new model.")


# PredictionResult: Stores emotion classification results
class PredictionResult:
  """
  Stores the result of an emotion prediction.
  """

  def __init__(self, label, confidence):
    self.label = label  # Predicted emotion label
    self.confidence = confidence  # Confidence score

  def __repr__(self):
    """
    String representation of the prediction result.
    """
    return f"PredictionResult(label={self.label}, confidence={self.confidence})"


# AudioEmotionDetectionPipeline: Get results
class AudioEmotionDetectionPipeline:
  """
  Manages the workflow:
  - Extracts features using OpenSmile.
  - Trains a model using CREMA-D AudioMP3 files.
  - Predicts emotions on new audio files using the trained model.
  """

  def __init__(self, file_ids):
    self.file_ids = file_ids  # Google Drive audio file IDs
    self.processor = None  # To handle audio file processing
    self.extractor = FeatureExtractor()  # To extract features from audio
    self.classifier = EmotionClassifier()  # Emotion classifier

  def load_crema_d_data(self):
    """
    Loads CREMA-D AudioMP3 dataset, extracting file paths, emotion labels, and emotion levels from filenames.
    Returns a DataFrame with file paths, combined emotion labels and levels.
    """
    audio_dir = './CREMA-D/AudioMP3'
    audio_files = [f for f in os.listdir(audio_dir) if f.endswith('.mp3')]

    # Define emotion and level mappings
    emotions = {
        'ANG': 'Anger',
        'DIS': 'Disgust',
        'FEA': 'Fear',
        'HAP': 'Happiness',
        'NEU': 'Neutral',
        'SAD': 'Sadness'
    }

    levels = {
        'LO': 'Low',
        'MD': 'Medium',
        'HI': 'High',
        'XX': 'Unspecified'
    }

    file_paths = []
    labels = []

    for file in audio_files:
      parts = file.split('_')

      if len(parts) >= 4:
        emotion_code = parts[2]  # The third part is the emotion
        level_code = parts[3].replace('.mp3', '')  # Remove the .mp3 extension

        if emotion_code in emotions and level_code in levels:
          emotion = emotions[emotion_code]
          level = levels[level_code]
          combined_label = f"{emotion}_{level}"  # Combine emotion and level

          file_paths.append(os.path.join(audio_dir, file))
          labels.append(combined_label)

    print(f"Loaded {len(labels)} labels from the files.")

    return pd.DataFrame({'Path': file_paths, 'Label': labels})

  def download_and_extract_features(self):
    """
    Downloads audio files from Google Drive and extracts features.
    Returns features and a list of file paths.
    """
    file_paths = self.download_files_from_drive(self.file_ids)
    self.processor = AudioProcessor(file_paths)
    features = self.extractor.extract_batch_from_paths(file_paths)
    return features, file_paths  # Return features and file_paths

  def download_files_from_drive(self, file_ids):
    """
    Downloads files from Google Drive using file IDs.
    Returns a list of file paths.
    """
    auth.authenticate_user()
    gauth = GoogleAuth()
    gauth.credentials = GoogleCredentials.get_application_default()
    drive = GoogleDrive(gauth)

    file_paths = []
    for filename, file_id in file_ids.items():
      downloaded = drive.CreateFile({'id': file_id})
      downloaded.GetContentFile(filename)
      file_paths.append(filename)
      print(f"{filename} downloaded")
    return file_paths

  def train_classifier(self):
    """
    Trains the emotion classifier using CREMA-D dataset.
    """
    crema_d_data = self.load_crema_d_data()

    # Check the size of the dataset before splitting
    print(f"Dataset size before splitting: {crema_d_data.shape}")

    if crema_d_data.empty:
      print("Error: The dataset is empty!")
      return

    # Load the model if it exists, otherwise train
    self.classifier.load_model('audio_emotion_classifier.joblib')

    if not self.classifier.is_fitted:
      # If the model is not loaded, we need to train it
      X_train, X_test, y_train, y_test = train_test_split(
          crema_d_data['Path'], crema_d_data['Label'], test_size=0.2, random_state=42)

      X_train_features = self.extractor.extract_batch_from_paths(X_train)
      X_test_features = self.extractor.extract_batch_from_paths(X_test)

      self.classifier.train(X_train_features, y_train)

      # Save the trained model
      self.classifier.save_model('audio_emotion_classifier.joblib')

      # Evaluate model performance
      y_test_pred = self.classifier.predict_top_label(X_test_features)
      print("Model evaluation on test set:")
      print(classification_report(y_test, y_test_pred))

      cm = confusion_matrix(y_test, y_test_pred)
      print("Confusion Matrix:")
      print(cm)

  def run(self):
    """
    Runs the entire pipeline and returns predictions for multiple audio files.
    """
    # Train classifier and predict on new audio files
    self.train_classifier()
    audio_features, file_paths = self.download_and_extract_features()

    # Predict on new audio files
    all_predictions = self.classifier.predict(audio_features)

    # Prepare DataFrame for all predictions with audio file reference
    results = []
    for i, sample_predictions in enumerate(all_predictions):
      # Get the corresponding audio file name for this sample
      audio_file = os.path.basename(file_paths[i])  # Get file name
      for pred in sample_predictions:
        emotion, level = pred.label.split('_')
        results.append({
            "audio_file": audio_file,
            "emotion": emotion,
            "level": level,
            "confidence": pred.confidence
        })
    return pd.DataFrame(results)

# Text Processing with Sentiment Analysis

In [None]:
#For sentiment
# import pandas as pd
# import torch
# from sklearn.model_selection import train_test_split
# from transformers import BigBirdTokenizer, BigBirdForSequenceClassification, Trainer, TrainingArguments
# from transformers import DataCollatorWithPadding
# from datasets import Dataset
# from sklearn.metrics import accuracy_score, precision_recall_fscore_support
# import torch.nn.functional as F

In [None]:
#For sentiment


# Main class that encapsulates all functionalities
class TextSentimentAnalysisPipeline:
    def __init__(self, dataset_path=None, sample_fraction=0.03, load_model=False):
        # Load the dataset
        self.file_path = dataset_path
        self.sample_fraction = sample_fraction

        if load_model:
          self.tokenizer = BigBirdTokenizer.from_pretrained('./sentiment_model')
          self.model = BigBirdForSequenceClassification.from_pretrained('./sentiment_model')
          print('Loaded model and tokenizer')
        else:
          self.data = pd.read_csv(self.file_path)
          self.tokenizer = BigBirdTokenizer.from_pretrained('google/bigbird-roberta-base')
          self.model = BigBirdForSequenceClassification.from_pretrained('google/bigbird-roberta-base', num_labels=3)

        # Ensure all tensors are contiguous
        self.make_tensors_contiguous()

    class Preprocessor:
        @staticmethod
        def preprocess_data(data):
            # Map sentiments to labels
            label_mapping = {'neutral': 0, 'positive': 1, 'negative': 2}
            data['label'] = data['sentiment'].map(label_mapping)
            return data[['text', 'label']]

        @staticmethod
        def clean_text(texts, labels):
            cleaned_texts = [text for text in texts if isinstance(text, str) and text.strip() != ""]
            cleaned_labels = [label for text, label in zip(texts, labels) if isinstance(text, str) and text.strip() != ""]
            return cleaned_texts, cleaned_labels

    class TokenizerWrapper:
        def __init__(self, tokenizer):
            self.tokenizer = tokenizer

        def tokenize(self, texts):
            return self.tokenizer(texts, truncation=True, padding='max_length', max_length=128)

    class ModelTrainer:
        def __init__(self, model, tokenizer, train_dataset, test_dataset, data_collator):
            self.model = model
            self.tokenizer = tokenizer
            self.train_dataset = train_dataset
            self.test_dataset = test_dataset
            self.data_collator = data_collator

        def train(self):
            # Training arguments
            training_args = TrainingArguments(
                output_dir='./results',
                evaluation_strategy="epoch",
                per_device_train_batch_size=8,
                per_device_eval_batch_size=8,
                num_train_epochs=3,
                weight_decay=0.01,
                logging_dir='./logs',
            )

            # Trainer
            trainer = Trainer(
                model=self.model,
                args=training_args,
                train_dataset=self.train_dataset,
                eval_dataset=self.test_dataset,
                tokenizer=self.tokenizer,
                data_collator=self.data_collator,
                compute_metrics=self.compute_metrics
            )

            # Train the model
            trainer.train()
            self.model.save_pretrained('./sentiment_model')
            self.tokenizer.save_pretrained('./sentiment_model')
            print('Model and tokenizer saved')
            return trainer

        @staticmethod
        def compute_metrics(pred):
            labels = pred.label_ids
            preds = pred.predictions.argmax(-1)
            precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
            acc = accuracy_score(labels, preds)
            return {
                'accuracy': acc,
                'precision': precision,
                'recall': recall,
                'f1': f1,
            }

    class SentimentPredictor:
        def __init__(self, tokenizer, model):
            self.tokenizer = tokenizer
            self.model = model

        def predict_sentiment(self, conversation):
            user_text = self.extract_user_text(conversation)
            inputs = self.tokenizer(user_text, return_tensors="pt", truncation=True, padding=True, max_length=128)
            outputs = self.model(**inputs)
            logits = outputs.logits
            probs = F.softmax(logits, dim=1)
            sentiment = {0: "neutral", 1: "positive", 2: "negative"}
            data = {"sentiment":[], "confidence":[]}
            for i, label in sentiment.items():
                data["sentiment"].append(label)
                data["confidence"].append(probs[0][i].item())
            return pd.DataFrame(data)

        @staticmethod
        def extract_user_text(conversation):
            user_texts = [line for line in conversation.split('\n') if line.startswith("User:")]
            return " ".join([text.replace("User:", "").strip() for text in user_texts])

    def run_pipeline(self):
        # Preprocess and sample data
        self.data = self.Preprocessor.preprocess_data(self.data)
        train_data_sampled = self.data.sample(frac=self.sample_fraction, random_state=42)

        # Split the data
        train_texts, test_texts, train_labels, test_labels = train_test_split(
            train_data_sampled['text'].tolist(), train_data_sampled['label'].tolist(), test_size=0.2, random_state=42
        )

        # Clean the texts
        train_texts_clean, train_labels_clean = self.Preprocessor.clean_text(train_texts, train_labels)
        test_texts_clean, test_labels_clean = self.Preprocessor.clean_text(test_texts, test_labels)

        # Tokenization
        tokenizer_wrapper = self.TokenizerWrapper(self.tokenizer)
        train_encodings = tokenizer_wrapper.tokenize(train_texts_clean)
        test_encodings = tokenizer_wrapper.tokenize(test_texts_clean)

        # Create dataset
        train_dataset = Dataset.from_dict({
            'input_ids': train_encodings['input_ids'],
            'attention_mask': train_encodings['attention_mask'],
            'labels': train_labels_clean
        })

        test_dataset = Dataset.from_dict({
            'input_ids': test_encodings['input_ids'],
            'attention_mask': test_encodings['attention_mask'],
            'labels': test_labels_clean
        })

        # Data collator
        data_collator = DataCollatorWithPadding(self.tokenizer)

        # Train the model
        trainer = self.ModelTrainer(self.model, self.tokenizer, train_dataset, test_dataset, data_collator)
        trainer_instance = trainer.train()

        # Evaluate the model
        trainer_instance.evaluate()

    def make_tensors_contiguous(self):
        # Make all tensors in the model contiguous
        for name, param in self.model.named_parameters():
            if not param.is_contiguous():
                #print(f"Making contiguous: {name}")
                param.data = param.data.contiguous()

        # Verify if all tensors are now contiguous
        #for name, param in self.model.named_parameters():
            #print(f"Layer: {name}, Contiguous: {param.is_contiguous()}")

    def predict(self, conversation):
        # Sentiment prediction
        sentiment_predictor = self.SentimentPredictor(self.tokenizer, self.model)
        return sentiment_predictor.predict_sentiment(conversation)



# Contextual Coherence

In [None]:
# Import required libraries
# from transformers import BigBirdForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments, DataCollatorWithPadding
# import torch
# from torch.utils.data import Dataset
# import pandas as pd
# from google.colab import files
# import joblib
# import os
# import torch.nn.functional as F

In [None]:
# Step 1: Class for defining the custom dataset
class DialogueDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length):
        self.dataframe = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        context = self.dataframe.iloc[idx, 0]
        response = self.dataframe.iloc[idx, 1]
        label = self.dataframe.iloc[idx, 2]

        combined_text = context + " " + self.tokenizer.sep_token + " " + response
        encoding = self.tokenizer(
            combined_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        input_ids = encoding["input_ids"].squeeze(0)
        attention_mask = encoding["attention_mask"].squeeze(0)

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": torch.tensor(label, dtype=torch.long),
        }

# Step 2: Class for model training
class ModelTrainer:
    def __init__(self, train_dataset):
        self.tokenizer = AutoTokenizer.from_pretrained('google/bigbird-roberta-base')
        self.model = BigBirdForSequenceClassification.from_pretrained('google/bigbird-roberta-base')
        self.train_dataset = train_dataset
        self.training_args = self._setup_training_args()
        for name, param in self.model.named_parameters():
          if not param.is_contiguous():
            #print(f'Making contiguous:{name}')
            param.data = param.data.contiguous()
        #for name, param in self.model.named_parameters():
            #print(f'Layer:{name}, Contiguous:{param.is_contiguous()}')

    def _setup_training_args(self):
        # Set up training arguments, limiting to 1 epoch for quick testing
        return TrainingArguments(
            output_dir='./results',
            num_train_epochs=1,  # Quick testing with 1 epoch
            per_device_train_batch_size=8,
            learning_rate=2e-5,
            warmup_steps=500,
            weight_decay=0.01,
            logging_dir='./logs',
            logging_steps=50,
            save_total_limit=2,
            save_steps=200,
            evaluation_strategy="no",
        )

    def fine_tune_model(self):
        trainer = Trainer(
            model=self.model,
            args=self.training_args,
            train_dataset=self.train_dataset
        )
        trainer.train()
        return self.model

    def save_model(self, save_path):
        self.model.save_pretrained(save_path)
        self.tokenizer.save_pretrained(save_path)
        print(f"Model saved to {save_path}")

# Step 3: Class for coherence evaluation
class CoherenceEvaluator:
    def __init__(self, model_path):
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.model = BigBirdForSequenceClassification.from_pretrained(model_path)

    def tokenize_input(self, context, response):
        return self.tokenizer(context, response, return_tensors='pt', max_length=1024, truncation=True, padding='max_length')

    def compute_logits(self, inputs):
        outputs = self.model(**inputs)
        return outputs.logits

    def apply_softmax(self, logits):
        probabilities = F.softmax(logits, dim=1)
        return probabilities[0][1].item()

# Step 4: Main pipeline class to encapsulate the entire process
class CoherencePipeline:
    def __init__(self, dataset_path, model_save_path, load_model=False):
        self.file_path = dataset_path
        self.model_save_path = model_save_path
        self.load_model = load_model
        self.model_trainer = None
        self.coherence_evaluator = None

    def prepare_dataset(self):
        df = pd.read_csv(self.file_path)
        tokenizer = AutoTokenizer.from_pretrained('google/bigbird-roberta-base')
        train_dataset = DialogueDataset(df, tokenizer, max_length=256)
        return train_dataset

    def train_and_save_model(self, train_dataset):
        self.model_trainer = ModelTrainer(train_dataset)
        trained_model = self.model_trainer.fine_tune_model()
        self.model_trainer.save_model(self.model_save_path)
        return trained_model

    def evaluate_coherence(self):
        #file_name = list(self.file_path.keys())[0]
        with open(self.file_path, 'r') as file:
            dialogue = file.readlines()

        self.coherence_evaluator = CoherenceEvaluator(self.model_save_path)
        pairs = [(dialogue[i].strip(), dialogue[i + 1].strip()) for i in range(len(dialogue) - 1)]

        scores = []
        for context, response in pairs:
            inputs = self.coherence_evaluator.tokenize_input(context, response)
            logits = self.coherence_evaluator.compute_logits(inputs)
            score = self.coherence_evaluator.apply_softmax(logits)
            scores.append(score)

        # Create DataFrame to store results
        df_results = pd.DataFrame({
            'Pair Number': [f'Pair {i+1}' for i in range(len(pairs))],
            'Context': [pair[0] for pair in pairs],
            'Response': [pair[1] for pair in pairs],
            'Coherence Score': scores
        })

        # Calculate overall coherence score
        overall_score = sum(scores) / len(scores)
        df_results.loc['Overall'] = ['', '', 'Overall Coherence Score', overall_score]

        return df_results

    def run_pipeline(self):
        if self.load_model:
            # Check if fine-tuned model exists
            if self.model_save_path.startswith('google/'):
              print(f'Using pretrained model from Hugging Face:{self.model_save_path}')
            else:
                if not os.path.exists(self.model_save_path):
                    raise FileNotFoundError(f"No fine-tuned model found at {self.model_save_path}. Please train the model first.")
                print(f"Using existing model from {self.model_save_path}")
        else:
            # Train model if flag is set to True
            train_dataset = self.prepare_dataset()
            self.train_and_save_model(train_dataset)

        # Proceed to evaluate test data
        df_results = self.evaluate_coherence()
        print(df_results)
        return df_results

# Final Calculation


In [None]:
# import pandas as pd
# import math

# Function to map emotion and level to a score
def map_emotion_to_score(emotion, level):
    emotion_scores = {
        'Happiness': {'Low': 1, 'Medium': 2, 'High': 3, 'Unspecified': 2},
        'Neutral': {'Low': 0, 'Medium': 0, 'High': 0, 'Unspecified': 0},
        'Anger': {'Low': -1, 'Medium': -2, 'High': -3, 'Unspecified': -2},
        'Disgust': {'Low': -1, 'Medium': -2, 'High': -3, 'Unspecified': -2},
        'Fear': {'Low': -1, 'Medium': -2, 'High': -3, 'Unspecified': -2},
        'Sadness': {'Low': -1, 'Medium': -2, 'High': -3, 'Unspecified': -2}
    }
    return emotion_scores.get(emotion, {'Medium': 0}).get(level, 0)

# Function to map sentiment to score
def map_sentiment_to_score(sentiment):
    sentiment_scores = {'Neutral':0, 'Negative':-1, 'Positive':1}
    return sentiment_scores.get(sentiment, 0)

# Function to apply sigmoid transformation and scale
def sigmoid_transform(x):
    x_sigmoid = 1 / (1 + math.exp(-x))
    x_scaled = x_sigmoid * 10
    return x_scaled

# Unified function to calculate the final score from any input format
def calculate_sentiment_score(df):
    if 'emotion' in df.columns and 'level' in df.columns:
        # Process DataFrame with emotions and levels
        df['score'] = df.apply(lambda row: map_emotion_to_score(row['emotion'], row['level']), axis=1)
        df['weighted_score'] = df['score'] * df['confidence']
    elif 'sentiment' in df.columns:
        # Process DataFrame with Sentiment, Confidence, and Score
        df['score'] = df.apply(lambda row: map_sentiment_to_score(row['sentiment']), axis = 1)
        df['weighted_score'] = df['score'] * df['confidence']
    else:
        raise ValueError("DataFrame format not recognized.")

    # Calculate weighted sum of scores
    weighted_sum = df['weighted_score'].sum()

    # Calculate total confidence
    total_confidence = df['confidence'].sum()

    # Compute the final raw score
    sentiment_score_raw = weighted_sum / total_confidence if total_confidence != 0 else 0

    # Apply sigmoid transformation to the final score
    sentiment_score = sigmoid_transform(sentiment_score_raw)

    return sentiment_score, total_confidence

def weighted_score(audio_score, audio_confidence, text_score, text_confidence):
    return (audio_score * audio_confidence + text_score * text_confidence) / (audio_confidence + text_confidence)

'''
# Example usage for both variations:

# Variation 1: Example DataFrame with emotion, level, and confidence
data_emotion = {
    'audio_file': ['audio1.mp3', 'audio1.mp3', 'audio1.mp3', 'audio1.mp3'],
    'emotion': ['Happiness', 'Anger', 'Neutral', 'Sadness'],
    'level': ['High', 'Medium', 'Unspecified', 'Low'],
    'confidence': [0.6, 0.2, 0.1, 0.1]
}
df_emotion = pd.DataFrame(data_emotion)

# Variation 2: Example DataFrame with Sentiment, Confidence, and Score
data_sentiment = {
    'Sentiment': ['neutral', 'positive', 'negative'],
    'Confidence': [0.868819, 0.049960, 0.081221]
    #'Score': [0, 1, -1]
}
df_sentiment = pd.DataFrame(data_sentiment)

# Calculate the final score for both variations
final_score_emotion = calculate_final_score(df_emotion)
final_score_sentiment = calculate_final_score(df_sentiment)

print(f"Final score (Emotion DataFrame): {final_score_emotion:.2f}")
print(f"Final score (Sentiment DataFrame): {final_score_sentiment:.2f}")
'''

'\n# Example usage for both variations:\n\n# Variation 1: Example DataFrame with emotion, level, and confidence\ndata_emotion = {\n    \'audio_file\': [\'audio1.mp3\', \'audio1.mp3\', \'audio1.mp3\', \'audio1.mp3\'],\n    \'emotion\': [\'Happiness\', \'Anger\', \'Neutral\', \'Sadness\'],\n    \'level\': [\'High\', \'Medium\', \'Unspecified\', \'Low\'],\n    \'confidence\': [0.6, 0.2, 0.1, 0.1]\n}\ndf_emotion = pd.DataFrame(data_emotion)\n\n# Variation 2: Example DataFrame with Sentiment, Confidence, and Score\ndata_sentiment = {\n    \'Sentiment\': [\'neutral\', \'positive\', \'negative\'],\n    \'Confidence\': [0.868819, 0.049960, 0.081221]\n    #\'Score\': [0, 1, -1]\n}\ndf_sentiment = pd.DataFrame(data_sentiment)\n\n# Calculate the final score for both variations\nfinal_score_emotion = calculate_final_score(df_emotion)\nfinal_score_sentiment = calculate_final_score(df_sentiment)\n\nprint(f"Final score (Emotion DataFrame): {final_score_emotion:.2f}")\nprint(f"Final score (Sentiment D

# Main Pipeline

In [None]:
# Main function to run the pipeline
def main():

  # Define Google Drive file IDs
  audio_file_ids = {
      'audio1.mp3': '108kPpEQeA_6RkQXmmLWDJXQzdiISlm0r'
  }

  # Create and run the AudioEmotionDetectionPipeline
  audio_pipeline = AudioEmotionDetectionPipeline(audio_file_ids)
  audio_results_df = audio_pipeline.run()

  # Output the audio results
  print(audio_results_df)

  # Load sentiment analysis model to predict sentiment and confidence scores
  text_pipeline = TextSentimentAnalysisPipeline(load_model = True) # change to TextSentimentAnalysisPipeline('/content/train.csv') and text_pipeline.run_pipeline() if training
  textresults_df = text_pipeline.predict('/content/dialogue1.txt') # comment out if using training model
  print(textresults_df)

  final_score_emotion, emotion_confidence = calculate_sentiment_score(audio_results_df)
  final_score_sentiment, sentiment_confidence = calculate_sentiment_score(textresults_df)

  print(f"Final score (Emotion DataFrame mingyao): {final_score_emotion:.2f}")
  print(f"Final score (Sentiment DataFrame bhavik): {final_score_sentiment:.2f}")

  user_satisfaction = weighted_score(final_score_emotion, emotion_confidence, final_score_sentiment, sentiment_confidence)
  print(f"Weighted score for user satisfaction: {user_satisfaction:.2f}")

  coherence_pipeline = CoherencePipeline(
    dataset_path = '/content/dialogue1.txt', # Change to '/content/dialogues_dataset.csv' if you want to train
    model_save_path='./coherence_model', load_model=True)  # Set to False if you want to train
  coherence_result = coherence_pipeline.run_pipeline()
  coherence_score = coherence_result.loc['Overall', 'Coherence Score']

  final_score = user_satisfaction*coherence_score
  print(f"Final score: {final_score:.2f}")

  #score = calculate_final_score(emotions, levels, confidences)
  #print(f"Final score: {score:.2f}")


if __name__ == "__main__":
  main()

Loaded 7442 labels from the files.
Dataset size before splitting: (7442, 2)
Model, scaler, label encoder, and feature names loaded successfully.
audio1.mp3 downloaded
    audio_file    emotion        level  confidence
0   audio1.mp3      Anger  Unspecified    0.832003
1   audio1.mp3    Disgust  Unspecified    0.084818
2   audio1.mp3       Fear  Unspecified    0.037596
3   audio1.mp3  Happiness  Unspecified    0.013726
4   audio1.mp3    Neutral  Unspecified    0.010456
5   audio1.mp3    Sadness  Unspecified    0.005088
6   audio1.mp3      Anger       Medium    0.003968
7   audio1.mp3    Sadness          Low    0.001975
8   audio1.mp3       Fear         High    0.001580
9   audio1.mp3      Anger         High    0.001542
10  audio1.mp3  Happiness         High    0.001529
11  audio1.mp3    Disgust         High    0.001103
12  audio1.mp3      Anger          Low    0.000811
13  audio1.mp3       Fear          Low    0.000716
14  audio1.mp3  Happiness       Medium    0.000674
15  audio1.mp3   

Attention type 'block_sparse' is not possible if sequence_length: 2 <= num global tokens: 2 * config.block_size + min. num sliding tokens: 3 * config.block_size + config.num_random_blocks * config.block_size + additional buffer: config.num_random_blocks * config.block_size = 704 with config.block_size = 64, config.num_random_blocks = 3. Changing attention type to 'original_full'...


Loaded model and tokenizer
  sentiment  confidence
0   neutral    0.414615
1  positive    0.296426
2  negative    0.288958
Final score (Emotion DataFrame mingyao): 1.29
Final score (Sentiment DataFrame bhavik): 5.00
Weighted score for user satisfaction: 3.14
Using existing model from ./coherence_model
        Pair Number                                            Context  \
0            Pair 1  ["AI: Hi, my name is Lila. I'm Octivo's AI age...   
1            Pair 2  "Caller: Hey, nice to meet you. My name is Mic...   
2            Pair 3  "AI: Thank you for introducing yourself Michae...   
3            Pair 4  "Caller: Yeah, sure. I'm 27 but I feel like I ...   
4            Pair 5  "AI: I completely understand your hesitation a...   
5            Pair 6  "Caller: Ok, that's fair enough. So I'm earnin...   
6            Pair 7  "AI: Thank you for sharing your income range t...   
7            Pair 8  "Caller: I will retire at around 65 and I woul...   
Overall                        

# Contextual Coherence (ignore - for record keeping)

In [None]:
# Import required libraries
from transformers import BigBirdForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
import torch
from torch.utils.data import Dataset
import pandas as pd
from google.colab import files
import joblib
import os
import torch.nn.functional as F
from google.colab import drive
import gc

# Step 0: Safely handle mounting Google Drive
if not os.path.ismount('/content/drive'):
    drive.mount('/content/drive', force_remount=False)
    print("Drive mounted successfully.")
else:
    print("Google Drive is already mounted.")

# Clear cache and collect garbage to free memory
torch.cuda.empty_cache()
gc.collect()

# Step 1: Class for handling file operations and dataset management
class DatasetHandler:
    def __init__(self, save_directory):
        self.save_directory = save_directory
        self.train_dataset_file = os.path.join(self.save_directory, 'saved_train_dataset.pkl')
        self.dataframe = None

    def create_save_directory(self):
        if not os.path.exists(self.save_directory):
            os.makedirs(self.save_directory)
            print(f"Created directory: {self.save_directory}")

    def load_or_upload_dataset(self):
        if not os.path.exists(self.train_dataset_file):
            print("Training dataset file not found in Google Drive. Please upload the dataset.")
            uploaded = files.upload()
            file_name = list(uploaded.keys())[0]
            self.dataframe = pd.read_csv(file_name)
            joblib.dump(self.dataframe, self.train_dataset_file)
            print(f"Dataset saved to {self.train_dataset_file} in Google Drive.")
        else:
            # Load the dataset from Google Drive if it already exists
            self.dataframe = joblib.load(self.train_dataset_file)
            print(f"Dataset loaded from {self.train_dataset_file} in Google Drive.")
        return self.dataframe

# Step 2: Class for defining the custom dataset
class DialogueDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length):
        self.dataframe = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        context = self.dataframe.iloc[idx, 0]
        response = self.dataframe.iloc[idx, 1]
        label = self.dataframe.iloc[idx, 2]

        combined_text = context + " " + self.tokenizer.sep_token + " " + response
        encoding = self.tokenizer(
            combined_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        input_ids = encoding["input_ids"].squeeze(0)
        attention_mask = encoding["attention_mask"].squeeze(0)

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": torch.tensor(label, dtype=torch.long),
        }

# Custom Trainer class to handle non-contiguous tensor issue
class CustomTrainer(Trainer):
    def save_model(self, output_dir=None, _internal_call=False):
        # Make all tensors contiguous before saving
        for param in self.model.parameters():
            param.data = param.data.contiguous()
        super().save_model(output_dir, _internal_call=_internal_call)

# Step 3: Class for model training
class ModelTrainer:
    def __init__(self, model_name, train_dataset):
        self.model_name = model_name
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = BigBirdForSequenceClassification.from_pretrained(self.model_name)
        self.train_dataset = train_dataset
        self.training_args = self._setup_training_args()

    def _setup_training_args(self):
        # Set up training arguments, limiting to 1 epoch for quick testing
        return TrainingArguments(
            output_dir='./results',
            num_train_epochs=1,  # Quick testing with 1 epoch
            per_device_train_batch_size=2,
            learning_rate=2e-5,
            warmup_steps=500,
            weight_decay=0.01,
            logging_dir='./logs',
            logging_steps=50,
            save_total_limit=2,
            save_steps=200,
            evaluation_strategy="no",
        )

    def fine_tune_model(self):
        trainer = CustomTrainer(
            model=self.model,
            args=self.training_args,
            train_dataset=self.train_dataset
        )
        trainer.train()
        return self.model

    def save_model(self, save_path):
        # Ensure all tensors are contiguous before saving
        for param in self.model.parameters():
            param.data = param.data.contiguous()
        self.model.save_pretrained(save_path)
        self.tokenizer.save_pretrained(save_path)
        print(f"Model saved to {save_path}")

# Step 4: Class for coherence evaluation with memory management
class CoherenceEvaluator:
    def __init__(self, model_path):
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.model = BigBirdForSequenceClassification.from_pretrained(model_path)

        # Move model to CPU to avoid GPU memory issues
        device = torch.device('cpu')
        self.model = self.model.to(device)

    def tokenize_input(self, context, response):
        return self.tokenizer(context, response, return_tensors='pt', max_length=1024, truncation=True, padding='max_length')

    def compute_logits(self, inputs):
        # Move inputs to the same device as the model
        inputs = {key: val.to(self.model.device) for key, val in inputs.items()}
        outputs = self.model(**inputs)
        return outputs.logits

    def apply_softmax(self, logits):
        probabilities = F.softmax(logits, dim=1)
        return probabilities[0][1].item()

# Step 5: Main pipeline class to encapsulate the entire process
class CoherencePipeline:
    def __init__(self, dataset_directory, model_name, model_save_path, train_model=True):
        self.dataset_directory = dataset_directory
        self.model_name = model_name
        self.model_save_path = model_save_path
        self.train_model = train_model
        self.dataset_handler = DatasetHandler(dataset_directory)
        self.model_trainer = None
        self.coherence_evaluator = None

    def prepare_dataset(self):
        self.dataset_handler.create_save_directory()
        df = self.dataset_handler.load_or_upload_dataset()
        tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        train_dataset = DialogueDataset(df, tokenizer, max_length=256)
        return train_dataset

    def train_and_save_model(self, train_dataset):
        self.model_trainer = ModelTrainer(self.model_name, train_dataset)
        trained_model = self.model_trainer.fine_tune_model()
        self.model_trainer.save_model(self.model_save_path)
        return trained_model

    def evaluate_coherence(self, dialogue_file_path):
        print("Please upload the test file for evaluation:")
        uploaded = files.upload()
        file_name = list(uploaded.keys())[0]
        with open(file_name, 'r') as file:
            dialogue = file.readlines()

        self.coherence_evaluator = CoherenceEvaluator(self.model_save_path)
        pairs = [(dialogue[i].strip(), dialogue[i + 1].strip()) for i in range(len(dialogue) - 1)]

        scores = []
        for context, response in pairs:
            inputs = self.coherence_evaluator.tokenize_input(context, response)
            logits = self.coherence_evaluator.compute_logits(inputs)
            score = self.coherence_evaluator.apply_softmax(logits)
            scores.append(score)

        # Create DataFrame to store results
        df_results = pd.DataFrame({
            'Pair Number': [f'Pair {i+1}' for i in range(len(pairs))],
            'Context': [pair[0] for pair in pairs],
            'Response': [pair[1] for pair in pairs],
            'Coherence Score': scores
        })

        # Calculate overall coherence score
        overall_score = sum(scores) / len(scores)
        df_results.loc['Overall'] = ['', '', 'Overall Coherence Score', overall_score]

        return df_results

    def run_pipeline(self):
        if self.train_model:
            # Train model if flag is set to True
            train_dataset = self.prepare_dataset()
            self.train_and_save_model(train_dataset)
        else:
            # Check if using a pretrained model from Hugging Face
            if self.model_save_path.startswith("google/"):
                print(f"Using pretrained model from Hugging Face: {self.model_save_path}")
            else:
                # Check if fine-tuned model exists locally
                if not os.path.exists(self.model_save_path):
                    raise FileNotFoundError(f"No fine-tuned model found at {self.model_save_path}. Please train the model first.")
                print(f"Using existing model from {self.model_save_path}")

        # Proceed to evaluate test data and get DataFrame
        df_results = self.evaluate_coherence('your_dialogue_test_file.txt')
        print(df_results)
        return df_results

# Step 6: Run the pipeline
pipeline = CoherencePipeline(
    dataset_directory='/content/drive/MyDrive/Coherence_Model',
    model_name="google/bigbird-roberta-base",
    model_save_path="google/bigbird-roberta-base",  # Pretrained model path
    train_model=False  # Set to True if you want to train, False to use existing model
)

# Run the pipeline
df_results = pipeline.run_pipeline()


Mounted at /content/drive
Drive mounted successfully.
Using pretrained model from Hugging Face: google/bigbird-roberta-base
Please upload the test file for evaluation:


Saving dialogue1.txt to dialogue1.txt


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/1.02k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/760 [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/846k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/775 [00:00<?, ?B/s]



pytorch_model.bin:   0%|          | 0.00/513M [00:00<?, ?B/s]

Some weights of BigBirdForSequenceClassification were not initialized from the model checkpoint at google/bigbird-roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


        Pair Number                                            Context  \
0            Pair 1  ["AI: Hi, my name is Lila. I'm Octivo's AI age...   
1            Pair 2  "Caller: Hey, nice to meet you. My name is Mic...   
2            Pair 3  "AI: Thank you for introducing yourself Michae...   
3            Pair 4  "Caller: Yeah, sure. I'm 27 but I feel like I ...   
4            Pair 5  "AI: I completely understand your hesitation a...   
5            Pair 6  "Caller: Ok, that's fair enough. So I'm earnin...   
6            Pair 7  "AI: Thank you for sharing your income range t...   
7            Pair 8  "Caller: I will retire at around 65 and I woul...   
Overall                                                                  

                                                  Response  Coherence Score  
0        "Caller: Hey, nice to meet you. My name is Mic...         0.566975  
1        "AI: Thank you for introducing yourself Michae...         0.552891  
2        "Caller: Yeah, s