# Classify 
This notebook combines all other solutions to determine whether a caller is angry, and to which agent the angry caller will be assigned to.

In [1]:
import nbformat
from IPython import get_ipython
from IPython.core.interactiveshell import InteractiveShell

def import_from_notebook(notebook_path, function_name):
    with open(notebook_path, 'r', encoding='utf-8') as f:
        nb = nbformat.read(f, as_version=4)

    shell = InteractiveShell.instance()
    code = "\n".join([cell.source for cell in nb.cells if cell.cell_type == 'code'])
    exec(code, shell.user_ns)
    return shell.user_ns[function_name]

In [2]:
is_caller_angry = import_from_notebook('Text Semantic Analysis.ipynb', 'is_caller_angry')

pygame 2.5.2 (SDL 2.28.3, Python 3.9.16)
Hello from the pygame community. https://www.pygame.org/contribute.html


Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [3]:
# Import necessary libraries
import os
import librosa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from keras.models import load_model
from sklearn.preprocessing import StandardScaler, OneHotEncoder

# Load and process the audio files in a folder
def load_audio_file(file_path):
    data, sampling_rate = librosa.load(file_path, duration=2.5, offset=0.6)
    return data, sampling_rate

# Feature extraction function
def extract_features(data, sample_rate):
    result = np.array([])
    
    # Zero Crossing Rate
    zcr = np.mean(librosa.feature.zero_crossing_rate(y=data).T, axis=0)
    result = np.hstack((result, zcr))
    
    # Chroma STFT
    stft = np.abs(librosa.stft(data))
    chroma_stft = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T, axis=0)
    result = np.hstack((result, chroma_stft))
    
    # MFCC
    mfcc = np.mean(librosa.feature.mfcc(y=data, sr=sample_rate).T, axis=0)
    result = np.hstack((result, mfcc))
    
    # Root Mean Square Value
    rms = np.mean(librosa.feature.rms(y=data).T, axis=0)
    result = np.hstack((result, rms))
    
    # Mel Spectrogram
    mel = np.mean(librosa.feature.melspectrogram(y=data, sr=sample_rate).T, axis=0)
    result = np.hstack((result, mel))
    
    return result

# Function to preprocess and standardize data
def preprocess_data(features, scaler):
    features = scaler.transform(features.reshape(1, -1))
    features = np.expand_dims(features, axis=2)
    return features

In [4]:
# Load the saved model
model_emotion = load_model('emotion_recognition_model.h5')

def get_emotion(file_path):
    
    # Load the scaler used for training
    scaler = StandardScaler()
    scaler.fit_transform(pd.read_csv('features.csv').iloc[:, :-1].values)

    file_path = file_path

    # Load the label encoder used for training
    label_encoder = OneHotEncoder()
    label_encoder.fit(pd.read_csv('features.csv')['labels'].values.reshape(-1, 1))

    # Load and preprocess the audio file
    data, sampling_rate = load_audio_file(file_path)
    features = extract_features(data, sampling_rate)
    processed_features = preprocess_data(features, scaler)

    # Make a prediction
    prediction = model_emotion.predict(processed_features)
    predicted_label = np.argmax(prediction, axis=1)

    # Map the predicted label to the corresponding emotion
    emotion = label_encoder.categories_[0][predicted_label[0]]

    return emotion


In [5]:
agent_country_dict = {
    'african': 'Caller assigned to African agent.',
    'australia': 'Caller assigned to Australian agent.',
    'bermuda': 'Caller assigned to Bermudian agent.',
    'canada': 'Caller assigned to Canadian agent.',
    'england': 'Caller assigned to English agent.',
    'hongkong': 'Caller assigned to Hong Kong agent.',
    'indian': 'Caller assigned to Indian agent.',
    'ireland': 'Caller assigned to Irish agent.',
    'malaysia': 'Caller assigned to Malaysian agent.',
    'newzealand': 'Caller assigned to New Zealand agent.',
    'philippines': 'Caller assigned to Filipino agent.',
    'scotland': 'Caller assigned to Scottish agent.',
    'singapore': 'Caller assigned to Singaporean agent.',
    'southatlandtic': 'Caller assigned to South Atlantic agent.',
    'us': 'Caller assigned to US agent.',
    'wales': 'Caller assigned to Welsh agent.'
}

In [6]:
import os
import numpy as np
import torch
import torch.nn as nn
from transformers import Wav2Vec2Processor, Wav2Vec2Model, Wav2Vec2Config, Wav2Vec2PreTrainedModel
import soundfile as sf
import torchaudio

# Define the ModelHead and AgeGenderModel classes
class ModelHead(nn.Module):
    def __init__(self, config, num_labels):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

class AgeGenderModel(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.config = config
        self.wav2vec2 = Wav2Vec2Model(config)
        self.age = ModelHead(config, 1)
        self.gender = ModelHead(config, 3)
        self.init_weights()

    def forward(self, input_values):
        outputs = self.wav2vec2(input_values)
        hidden_states = outputs[0]
        hidden_states = torch.mean(hidden_states, dim=1)
        logits_age = self.age(hidden_states)
        logits_gender = self.gender(hidden_states)
        return logits_age, logits_gender

# Load model and processor
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_name = 'audeering/wav2vec2-large-robust-24-ft-age-gender'
processor = Wav2Vec2Processor.from_pretrained(model_name)

# Load the saved age and gender model
cpu_device = torch.device('cpu')
age_gender_model_path = 'quantized_age_gender_model.pth'
config = Wav2Vec2Config.from_pretrained(model_name)
model = AgeGenderModel(config)
model.load_state_dict(torch.load(age_gender_model_path, map_location=cpu_device), strict=False)
model.to(device)

# Ensure sampling rate is 16,000 Hz
TARGET_SAMPLING_RATE = 16000

def resample_audio(signal, orig_sr, target_sr):
    if orig_sr != target_sr:
        resampler = torchaudio.transforms.Resample(orig_sr, target_sr)
        signal = resampler(torch.tensor(signal).float())
    return signal.numpy()

def normalize_audio(signal):
    return (signal - np.mean(signal)) / np.std(signal)

def process_func(file_path: str):
    signal, sr = sf.read(file_path)
    if len(signal.shape) > 1:
        signal = np.mean(signal, axis=1)  # Convert to mono
    signal = resample_audio(signal, sr, TARGET_SAMPLING_RATE)
    signal = normalize_audio(signal)
    inputs = processor(signal, sampling_rate=TARGET_SAMPLING_RATE, return_tensors="pt", padding=True)
    inputs = inputs.to(device)
    
    with torch.no_grad():
        logits_age, logits_gender = model(inputs['input_values'])
        
        # Apply scaling to the age logits
        age = round(logits_age.item() * 100)  # Assuming a scale factor of 100 for interpretation
        
        gender_probs = torch.softmax(logits_gender, dim=1).cpu().numpy()[0]
        gender = ['female', 'male', 'child'][np.argmax(gender_probs)]
        
    return age, gender, gender_probs

import torch
import torchaudio
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification

# Load the saved model for accent classification
model_name = "english_accents_classification"
model_endorse = AutoModelForAudioClassification.from_pretrained(model_name)

# Move the model to the appropriate device (CPU or GPU)
model_endorse.to(device)

# Load the feature extractor
feature_extractor = AutoFeatureExtractor.from_pretrained("dima806/english_accents_classification")

  device=storage.device,


In [7]:
import torch
import torchaudio
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification, pipeline, TrainingArguments, Trainer

# Load the saved model
model_name = "english_accents_classification"
model_endorse = AutoModelForAudioClassification.from_pretrained(model_name)

# Move the model to the appropriate device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_endorse.to(device)

# Load the feature extractor
feature_extractor = AutoFeatureExtractor.from_pretrained("dima806/english_accents_classification")

def endorse_to_agent(file_path):
    RATE_HZ = 16000

    audio, rate = torchaudio.load(file_path)
    transform = torchaudio.transforms.Resample(rate, RATE_HZ)
    audio = transform(audio).numpy().reshape(-1)

    target_sample_rate = feature_extractor.sampling_rate

    # Preprocess the audio
    inputs = feature_extractor(audio, sampling_rate=target_sample_rate, return_tensors="pt")

    # Move inputs to the same device as the model
    inputs = {key: value.to(device) for key, value in inputs.items()}

    # Make predictions
    with torch.no_grad():
        outputs = model_endorse(**inputs)

    # Extract logits (raw predictions)
    logits = outputs.logits

    # Get predicted class
    predicted_class_id = logits.argmax().item()
    predicted_class_label = model_endorse.config.id2label[predicted_class_id]

    # Age and Gender prediction
    age, gender, gender_probs = process_func(file_path)

    # Gender-swap endorsement logic
    if gender == 'male' or gender == 'child':
        agent_gender = 'female'
    elif gender == 'female' or gender == 'child':
        agent_gender = 'male'

    # Age-based endorsement logic
    if age <= 30:
        agent_age = '30 and below'
    elif age <= 40:
        agent_age = '40 and below'
    elif age <= 50:
        agent_age = '50 and below'
    elif age <= 60:
        agent_age = '60 and below'
    else:
        agent_age = 'above 60'

    agent_age_gender_info = f"Caller assigned to age {agent_age}, {agent_gender} agent."

    return agent_country_dict[predicted_class_label], agent_age_gender_info

In [8]:
#runs customer to all checkings if he truly is not angry

def verify_customer_is_calm(file_path):
    transcript_result = is_caller_angry(file_path)
    emotion = get_emotion(file_path)
    if emotion == 'angry' or transcript_result == 'negative':
        endorsement = endorse_to_agent(file_path)
        print('customer is angry')
        print(endorsement)
        return 1
    else:
        print('customer is calm')
        return 0

### verify_customer_is_calm (Neural Network Version)

In [9]:
# Import necessary libraries
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
import seaborn as sns
from keras.callbacks import ReduceLROnPlateau
from keras.models import Sequential
from keras.layers import Dense, Conv1D, MaxPooling1D, Flatten, Dropout
import os
import pandas as pd
import warnings

# Suppress warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

In [10]:
# Audio data augmentation functions
def noise(data):
    noise_amp = 0.035 * np.random.uniform() * np.amax(data)
    data = data + noise_amp * np.random.normal(size=data.shape[0])
    return data

def stretch(data, rate=0.8):
    return librosa.effects.time_stretch(data, rate=rate)

def shift(data):
    shift_range = int(np.random.uniform(low=-5, high=5) * 1000)
    return np.roll(data, shift_range)

def pitch(data, sampling_rate, pitch_factor=0.7):
    return librosa.effects.pitch_shift(data, sr=sampling_rate, n_steps=pitch_factor)

# Feature extraction function
def extract_features(data, sample_rate):
    result = np.array([])
    
    # Zero Crossing Rate
    zcr = np.mean(librosa.feature.zero_crossing_rate(y=data).T, axis=0)
    result = np.hstack((result, zcr))
    
    # Chroma STFT
    stft = np.abs(librosa.stft(data))
    chroma_stft = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T, axis=0)
    result = np.hstack((result, chroma_stft))
    
    # MFCC
    mfcc = np.mean(librosa.feature.mfcc(y=data, sr=sample_rate).T, axis=0)
    result = np.hstack((result, mfcc))
    
    # Root Mean Square Value
    rms = np.mean(librosa.feature.rms(y=data).T, axis=0)
    result = np.hstack((result, rms))
    
    # Mel Spectrogram
    mel = np.mean(librosa.feature.melspectrogram(y=data, sr=sample_rate).T, axis=0)
    result = np.hstack((result, mel))
    
    return result

# Function to get features from an audio file path
def get_features(path):
    data, sample_rate = librosa.load(path, duration=2.5, offset=0.6)
    
    # Without augmentation
    res1 = extract_features(data, sample_rate)
    result = np.array(res1)
    
    # Data with noise
    noise_data = noise(data)
    res2 = extract_features(noise_data, sample_rate)
    result = np.vstack((result, res2))
    
    # Data with stretching and pitching
    new_data = stretch(data)
    data_stretch_pitch = pitch(new_data, sample_rate)
    res3 = extract_features(data_stretch_pitch, sample_rate)
    result = np.vstack((result, res3))
    
    return result

In [11]:
#imported necessary packages
from pydub import AudioSegment
import pygame

#defined function that plays an audio file

def play_audio(file_path):
    pygame.init()
    pygame.mixer.init()
    try:
        pygame.mixer.music.load(file_path)#loads the audio file
        pygame.mixer.music.play()#plays the audio file
        while pygame.mixer.music.get_busy():#waits until the audio playback is finished before continuing with the rest of the program
            pygame.time.Clock().tick(10)
    except pygame.error as e:
        print("Error occurred while playing audio:", e) #prints an error message should the program fails to be executed
    pygame.quit()
    
#defined function that can play audio through the speakers (or headphones)

def play(input_file):
    #called function that plays the newly converted wav audio file
    play_audio(input_file)
    
#imported necessary packages
from transformers import pipeline

#Using pipeline, created a transcriber that when called, converts an audio file and transcribes it into text
transcriber = pipeline(task="automatic-speech-recognition", model="openai/whisper-small")

#imported logging so we can log the transcript into a log file
import logging

import ffmpeg

#Configures logging to write logs to a file named transcript.log
logging.basicConfig(filename='transcript.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

#defined a function that would utilize the transcriber
#in case the transcriber fails, the user will be asked to manually type the script
def transcribe(input_file):
    print("Transcribing. Please wait...")
    try:
        transcript = transcriber(input_file)#calls the transcriber
        logging.info("Transcript: %s", transcript)  #Logs the transcript
        print("Transcript:", transcript)  #Prints the transcript
    except Exception as e:#should transcriber fail, the messages below will be displayed and the user will be prompted to enter transcript manually
        logging.error("Error occurred: %s", e)  # Log the error
        print("Error occurred:", e)  # Print the error message

    return transcript

#defined function that determines whether a customer is angry or not

def get_semantic_score(input_file):
    #called transcribe function
    transcript = transcribe(input_file)
    
    from transformers import pipeline
    
    classify_sentiment = pipeline("text-classification", model="cardiffnlp/twitter-roberta-base-sentiment-latest")

    try:
        sentiment = classify_sentiment(transcript['text'])
        score = sentiment[0]['score'] #assigns score to score variable

    except:
        sentiment = 'negative'

    #extracts the label from the sentiment
    try:
        sentiment_label = sentiment[0]['label']
    except:
        sentiment_label = sentiment

    return score, sentiment_label
    

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [12]:
#runs customer to all checkings if he truly is not angry

from tensorflow.keras.models import load_model
import numpy as np
import os
import pandas as pd
import tensorflow as tf

# Load the best model
best_model = load_model('best_model.keras')

# Load the CSV file
csv_file = 'recordings_for_testing.csv'
df = pd.read_csv(csv_file)

def nn_verify_customer_is_calm(file_path):
    
    # Data Preparation
    X, Y = [], []

    filename = os.path.basename(file_path)

    emotion = df.loc[df['Filename'] == filename, 'Labels'].iloc[0]

    feature = get_features(file_path)
    for ele in feature:
        X.append(ele)
        Y.append(emotion)

    # Convert features and labels to DataFrame
    Features = pd.DataFrame(X)
    Features['labels'] = Y

    results = []

    score, sentiment_label = get_semantic_score(file_path)
    results.append((filename, score, sentiment_label))

    # Create a DataFrame from the results
    sentiment_df = pd.DataFrame(results, columns=['filename', 'score', 'sentiment_label'])



    # Convert the 'score' and 'sentiment_label' columns to tensors
    scores_tensor = tf.convert_to_tensor(sentiment_df['score'].values, dtype=tf.float32)

    # Mapping sentiment labels to integers 
    sentiment_mapping = {'neutral': 0, 'negative': 1, 'positive': 2}
    sentiments_tensor = tf.convert_to_tensor(sentiment_df['sentiment_label'].map(sentiment_mapping).values, dtype=tf.int64)

    # Convert audio features to tensor
    audio_features_tensor = tf.convert_to_tensor(Features.drop(columns=['labels']).values, dtype=tf.float32)
    labels_tensor = tf.convert_to_tensor(Features['labels'].values, dtype=tf.int64)
    
    # Repeat the sentiment scores and labels to match the augmented feature sets
    expanded_scores_tensor = tf.repeat(scores_tensor, repeats=3)
    expanded_sentiments_tensor = tf.repeat(sentiments_tensor, repeats=3)

    # Convert sentiments tensor to float32
    expanded_sentiments_tensor = tf.cast(expanded_sentiments_tensor, dtype=tf.float32)

    # Combine sentiment scores and labels with audio features
    expanded_scores_tensor = tf.expand_dims(expanded_scores_tensor, axis=1)
    expanded_sentiments_tensor = tf.expand_dims(expanded_sentiments_tensor, axis=1)
    combined_features_tensor = tf.concat([audio_features_tensor, expanded_scores_tensor, expanded_sentiments_tensor], axis=1)

    # Use the model to make predictions
    label = int(max(best_model.predict(combined_features_tensor)))

    if label == 1:
        endorsement = endorse_to_agent(file_path)
        print('customer is angry')
        print(endorsement)
        return 1
    else:
        print('customer is calm')
        return 0


In [16]:
# Test - Angry caller: English 30s male
print(nn_verify_customer_is_calm('all_recordings/angry1.wav'))

Transcribing. Please wait...
Transcript: {'text': ' Oh, you got go'}


Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step
customer is angry
('Caller assigned to English agent.', 'Caller assigned to age 30 and below, female agent.')
1


In [17]:
# Test - Angry caller: English 30s male
print(nn_verify_customer_is_calm('all_recordings/angry22.wav'))

Transcribing. Please wait...
Transcript: {'text': ' I hate you.'}


Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step
customer is angry
('Caller assigned to English agent.', 'Caller assigned to age 30 and below, female agent.')
1


In [18]:
# Test - Calm caller: English 20s female
print(nn_verify_customer_is_calm('all_recordings/10.wav'))

Transcribing. Please wait...
Transcript: {'text': ' My issue has not been resolved. What are the next steps?'}


Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step
customer is calm
0
