In [1]:
import pandas as pd
import numpy as np
import os
import seaborn as sns
import matplotlib.pyplot as plt
import librosa
import librosa.display
from IPython.display import Audio
import torchaudio
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import Wav2Vec2Model, Wav2Vec2Processor, Trainer, TrainingArguments, Wav2Vec2ForSequenceClassification
import warnings
warnings. filterwarnings('ignore')

In [2]:
paths = []
labels = []
for dirname, _, filenames in os.walk('//Users/mac/Documents/speech_model/dataset'):
    for filename in filenames:
        paths.append(os.path.join(dirname, filename))
        label = filename.split('_')[-1]
        label = label.split('.')[0]
        labels.append(label.lower())
    if len(paths) == 2800:
        break
print('Dataset is Loaded')

Dataset is Loaded


In [None]:
len(paths)

In [None]:
paths[:5]

In [None]:
labels[:5]

In [3]:
## Create a dataframe
df = pd.DataFrame()
df['speech'] = paths
df['label'] = labels
df.head()

Unnamed: 0,speech,label
0,//Users/mac/Documents/speech_model/dataset/TES...,disgust
1,//Users/mac/Documents/speech_model/dataset/TES...,disgust
2,//Users/mac/Documents/speech_model/dataset/TES...,disgust
3,//Users/mac/Documents/speech_model/dataset/TES...,disgust
4,//Users/mac/Documents/speech_model/dataset/TES...,disgust


In [None]:
df['label'].value_counts()

## Exploratory Data Analysis

In [None]:
sns.countplot(data=df, x='label')

In [12]:
def waveplot(data, sr, emotion):
    plt.figure(figsize=(10,4))
    plt.title(emotion, size=20)
    librosa.display.waveshow(data, sr=sr)
    plt.show()
    
def spectogram(data, sr, emotion):
    x = librosa.stft(data)
    xdb = librosa.amplitude_to_db(abs(x))
    plt.figure(figsize=(11,4))
    plt.title(emotion, size=20)
    librosa.display.specshow(xdb, sr=sr, x_axis='time', y_axis='hz')
    plt.colorbar()

In [None]:
emotion = 'fear'
path = np.array(df['speech'][df['label']==emotion])[0]
data, sampling_rate = librosa.load(path)
waveplot(data, sampling_rate, emotion)
spectogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion = 'angry'
path = np.array(df['speech'][df['label']==emotion])[1]
data, sampling_rate = librosa.load(path)
waveplot(data, sampling_rate, emotion)
spectogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion = 'disgust'
path = np.array(df['speech'][df['label']==emotion])[0]
data, sampling_rate = librosa.load(path)
waveplot(data, sampling_rate, emotion)
spectogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion = 'neutral'
path = np.array(df['speech'][df['label']==emotion])[0]
data, sampling_rate = librosa.load(path)
waveplot(data, sampling_rate, emotion)
spectogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion = 'sad'
path = np.array(df['speech'][df['label']==emotion])[0]
data, sampling_rate = librosa.load(path)
waveplot(data, sampling_rate, emotion)
spectogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion = 'ps'
path = np.array(df['speech'][df['label']==emotion])[0]
data, sampling_rate = librosa.load(path)
waveplot(data, sampling_rate, emotion)
spectogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion = 'happy'
path = np.array(df['speech'][df['label']==emotion])[0]
data, sampling_rate = librosa.load(path)
waveplot(data, sampling_rate, emotion)
spectogram(data, sampling_rate, emotion)
Audio(path)

Create Custom Dataset Class

In [4]:
# convert labels to integers
label_map = {label: idx for idx, label in enumerate(df['label'].unique())}
inverse_label_map = {idx: label for label, idx in label_map.items()}
df['label'] = df['label'].map(label_map)
df.head(2)


Unnamed: 0,speech,label
0,//Users/mac/Documents/speech_model/dataset/TES...,0
1,//Users/mac/Documents/speech_model/dataset/TES...,0


In [5]:
import torch
import torchaudio
import numpy as np
from torch.utils.data import Dataset

class SpeechEmotionDataset(Dataset):
    def __init__(self, df, processor, max_length=32000):
        self.df = df
        self.processor = processor
        self.max_length = max_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        # Retrieve the file path and label
        audio_path = self.df.iloc[idx]['speech']  # This is the path to the audio file
        label = self.df.iloc[idx]['label']

        # Load the audio file
        speech, sr = librosa.load(audio_path, sr = 16000)
        

        # Pad or truncate the speech to the required length
        if len(speech) > self.max_length:
            speech = speech[:self.max_length]
        else:
            speech = np.pad(speech, (0, self.max_length - len(speech)), 'constant')

        # Preprocess the audio data using the processor
        inputs = self.processor(speech, sampling_rate=16000, return_tensors='pt', padding=True, truncation=True, max_length=self.max_length)
        input_values = inputs.input_values.squeeze()

        return {'input_values': input_values, 'label': torch.tensor(label, dtype=torch.long)}


In [6]:

from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split(df, test_size = 0.2, random_state = 42)

Initialize the processor and model

In [7]:
model_name = "facebook/wav2vec2-base"
processor = Wav2Vec2Processor.from_pretrained(model_name)
model = Wav2Vec2ForSequenceClassification.from_pretrained(model_name, num_labels=7)

Some weights of Wav2Vec2ForSequenceClassification were not initialized from the model checkpoint at facebook/wav2vec2-base and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


load the dataset

In [12]:
train_dataset = SpeechEmotionDataset(train_df, processor)
test_dataset = SpeechEmotionDataset(test_df, processor)

Set Training Arguments

In [13]:
training_args = TrainingArguments(
    output_dir = "//Users/mac/Documents/speech_model/results",
    evaluation_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate = 2e-5,
    per_device_train_batch_size = 16,
    per_device_eval_batch_size = 16,
    num_train_epochs = 3,
    weight_decay = 0.01,
    report_to = []
)

In [14]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)
    accuracy = accuracy_score(labels, preds)
    precision, recall, f1, _ =precision_recall_fscore_support(labels, preds, average = 'weighted')
    return {
        "accuracy" : accuracy,
        "precision" : precision,
        "recall" : recall,
        "f1" : f1
    }

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics
)
trainer.train()

In [None]:
results = trainer.evaluate()
print(results)

In [8]:
from transformers import Wav2Vec2Processor, Wav2Vec2ForSequenceClassification
import torch
import librosa

# Load the processor and model
processor = Wav2Vec2Processor.from_pretrained("/Users/mac/Documents/speech_model/trained_model")
model = Wav2Vec2ForSequenceClassification.from_pretrained("/Users/mac/Documents/speech_model/trained_model")

print("Model and processor loaded successfully.")


Model and processor loaded successfully.


In [16]:
def predict_emotion(audio_path):
    # Load the audio file
    speech, sr = librosa.load(audio_path, sr=16000)

    # Preprocess the audio file
    inputs = processor(speech, sampling_rate=16000, return_tensors="pt", padding=True)

    # Forward pass to get the logits
    with torch.no_grad():
        logits = model(**inputs).logits

    # Get the predicted label (index of the highest logit value)
    predicted_label = torch.argmax(logits, dim=-1).item()

    return predicted_label


In [17]:
audio_path = "/Users/mac/Documents/speech_model/dataset/TESS Toronto emotional speech set data/YAF_angry/YAF_base_angry.wav"  # Provide the path to your local audio file
predicted_emotion = predict_emotion(audio_path)


emotion_mapping = {
    0: "fear",
    1: "angry",
    2: "disgst",
    3: "netural",
    4: "sad",
    5: "pleasant surprise",
    6: "happy"
}

print(f"Predicted emotion: {emotion_mapping[predicted_emotion]}")


Predicted emotion: angry


In [None]:
import torch
import numpy as np
import sounddevice as sd
import librosa
import soundfile as sf
from collections import Counter
import os

# Record live audio function with manual input control
def record_audio(sample_rate=16000):
    recorded_audio = []
    is_recording = False
    
    while True:
        user_input = input("Enter 's' to start/resume, 'p' to pause, 'e' to end: ").strip().lower()
        
        if user_input == 's':
            if not is_recording:
                print("Recording started. Enter 'p' to pause, 'e' to end.")
                is_recording = True
            audio_chunk = sd.rec(int(1 * sample_rate), samplerate=sample_rate, channels=1, dtype="float32")
            sd.wait()
            recorded_audio.append(audio_chunk.flatten())
            
        elif user_input == 'p':
            if is_recording:
                print("Recording paused. Enter 's' to resume or 'e' to end.")
                is_recording = False
            
        elif user_input == 'e':
            print("Recording ended.")
            break
    
    # Concatenate all audio chunks into one array
    return np.concatenate(recorded_audio)

# Split audio into chunks of a specified duration
def split_audio(audio, sample_rate=16000, chunk_duration=3):
    chunk_size = int(chunk_duration * sample_rate)
    num_chunks = len(audio) // chunk_size
    # Handle the case where the length is shorter than one chunk
    if len(audio) < chunk_size:
        print(f"Not enough audio data to create a chunk of {chunk_duration} seconds.")
        return [audio]  # Return as a single chunk if not enough data for multiple chunks
    print(f"Audio length: {len(audio)}, Chunk size: {chunk_size}, Number of chunks: {num_chunks}")
    return [audio[i*chunk_size : (i+1)*chunk_size] for i in range(num_chunks)]

# Predict emotion for each chunk
def predict_emotion_for_chunk(chunk, processor, model, sample_rate=16000):
    # Save chunk to a temporary file using soundfile
    temp_path = "temp_chunk.wav"
    sf.write(temp_path, chunk, sample_rate)
    
    # Use the provided predict_emotion function
    predicted_label = predict_emotion(temp_path)
    
    # Remove the temporary file
    os.remove(temp_path)
    
    return predicted_label

# Main function for emotion prediction from live audio
def predict_emotion_from_live_audio(processor, model, sample_rate=16000, chunk_duration=3):
    # Record live audio with user-controlled start, pause, and stop
    live_audio = record_audio(sample_rate=sample_rate)
    
    # Print length of recorded audio
    print(f"Length of recorded audio: {len(live_audio)} samples")

    # Split audio into chunks of chunk_duration seconds
    chunks = split_audio(live_audio, sample_rate=sample_rate, chunk_duration=chunk_duration)
    
    # Emotion mapping (ensure your model's label mapping matches this)
    emotion_mapping = {
        0: "happy",
        1: "sad",
        2: "angry",
        3: "fearful",
        4: "neutral",
        5: "disgust",
        6: "surprise"
    }
    
    # Predict emotion for each chunk and accumulate results
    predictions = []
    for chunk in chunks:
        print("Processing next chunk...")
        if len(chunk) == 0:
            print("Empty chunk detected, skipping.")
            continue
        predicted_label = predict_emotion_for_chunk(chunk, processor, model, sample_rate=sample_rate)
        predictions.append(emotion_mapping[predicted_label])
    
    # Count occurrences of each emotion
    emotion_counts = Counter(predictions)
    
    # Calculate percentages
    total_chunks = len(predictions)
    if total_chunks > 0:
        emotion_percentages = {emotion: (count / total_chunks) * 100 for emotion, count in emotion_counts.items()}
    
        # Print the final result
        print("Predicted emotion distribution:")
        for emotion, percentage in emotion_percentages.items():
            print(emotion + ": " + str(round(percentage, 2)) + "%")
    else:
        print("No audio chunks were processed.")

# Use the provided predict_emotion function
def predict_emotion(audio_path):
    # Load the audio file
    speech, sr = librosa.load(audio_path, sr=16000)

    # Preprocess the audio file
    inputs = processor(speech, sampling_rate=16000, return_tensors="pt", padding=True)

    # Forward pass to get the logits
    with torch.no_grad():
        logits = model(**inputs).logits

    # Get the predicted label (index of the highest logit value)
    predicted_label = torch.argmax(logits, dim=-1).item()

    return predicted_label

# Example usage
# Assuming the processor and model are already loaded in the notebook
predict_emotion_from_live_audio(processor, model)
