# Emotion Recognition in Greek Speech Using Wav2Vec 2.0

Since the dataset is hosted on [Mega](https://mega.nz/#F!0ShVXY7C!-73kVoK05OjTPEA95UUvMw) and unable to access it directly, we upload the dataset on the G-drive to make this tutorial accessible.

In [1]:
import pandas as pd
import numpy as np
import os
from pathlib import Path
from tqdm import tqdm
import torchaudio
import sys

In [2]:
#from google.colab import drive
#drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
#!unzip '/content/drive/MyDrive/Research/Multi_model_emotions/archive.zip'

Archive:  /content/drive/MyDrive/Research/Multi_model_emotions/archive.zip
  inflating: Audio Mode Only/Anger/S10-E-A-S1.wav  
  inflating: Audio Mode Only/Anger/S10-E-A-S2.wav  
  inflating: Audio Mode Only/Anger/S10-E-A-S3.wav  
  inflating: Audio Mode Only/Anger/S10-E-A-S4.wav  
  inflating: Audio Mode Only/Anger/S10-E-A-S5.wav  
  inflating: Audio Mode Only/Anger/S10-E-A-S6.wav  
  inflating: Audio Mode Only/Anger/S10-H-A-S1-R1.wav  
  inflating: Audio Mode Only/Anger/S10-H-A-S2-R1.wav  
  inflating: Audio Mode Only/Anger/S10-H-A-S3.wav  
  inflating: Audio Mode Only/Anger/S10-H-A-S4.wav  
  inflating: Audio Mode Only/Anger/S10-H-A-S5.wav  
  inflating: Audio Mode Only/Anger/S10-H-A-S6.wav  
  inflating: Audio Mode Only/Anger/S10-P-A-S1.wav  
  inflating: Audio Mode Only/Anger/S10-P-A-S2.wav  
  inflating: Audio Mode Only/Anger/S10-P-A-S3-R1.wav  
  inflating: Audio Mode Only/Anger/S10-P-A-S4-R1.wav  
  inflating: Audio Mode Only/Anger/S10-P-A-S5-R1.wav  
  inflating: Audio Mode On

In [10]:
data = []

for path in tqdm(Path("//content/Audio Mode Only").glob("**/*.wav")):
    name = str(path).split('/')[-1].split('.')[0]
    label = str(path).split('/')[-2]

    try:
        # There are some broken files
        s = torchaudio.load(path)
        data.append({
            "path": path,
            "emotion": label
        })
    except Exception as e:
        # print(str(path), e)
        pass

    # break

1233it [00:17, 69.26it/s]


In [11]:
df = pd.DataFrame(data)
df.head()

Unnamed: 0,path,emotion
0,//content/Audio Mode Only/Anger/S7-E-A-S4.wav,Anger
1,//content/Audio Mode Only/Anger/S11-E-A-S1.wav,Anger
2,//content/Audio Mode Only/Anger/S6-E-A-S1.wav,Anger
3,//content/Audio Mode Only/Anger/S9-H-A-S5-R1.wav,Anger
4,//content/Audio Mode Only/Anger/S3_H_A_S1_R1.wav,Anger


Let's explore how many labels (emotions) are in the dataset with what distribution.

In [12]:
print("Labels: ", df["emotion"].unique())
print()
df.groupby("emotion").count()[["path"]]

Labels:  ['Anger' 'Happy' 'Disgust' 'Sad' 'Fear' 'Neutral']



Unnamed: 0_level_0,path
emotion,Unnamed: 1_level_1
Anger,203
Disgust,206
Fear,199
Happy,207
Neutral,205
Sad,213


For training purposes, we need to split data into train test sets; in this specific example, we break with a `20%` rate for the test set.

In [13]:
import torchaudio
import librosa
import IPython.display as ipd
import numpy as np

idx = np.random.randint(0, len(df))
sample = df.iloc[idx]
path = sample["path"]
label = sample["emotion"]


print(f"ID Location: {idx}")
print(f"      Label: {label}")
print()

speech, sr = torchaudio.load(path)
speech = speech[0].numpy().squeeze()
speech = librosa.resample(np.asarray(speech), orig_sr = sr, target_sr = 16_000)
ipd.Audio(data=np.asarray(speech), autoplay=False, rate=16000)

ID Location: 807
      Label: Sad



In [14]:
from sklearn.model_selection import train_test_split

save_path = "/content/Audio Mode Only"

train_df, test_df = train_test_split(df, test_size=0.2, random_state=101, stratify=df["emotion"])

train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

train_df.to_csv(f"{save_path}/train.csv", sep="\t", encoding="utf-8", index=False)
test_df.to_csv(f"{save_path}/test.csv", sep="\t", encoding="utf-8", index=False)


print(train_df.shape)
print(test_df.shape)

(986, 2)
(247, 2)


## Prepare Data for Training

In [20]:
#!pip install datasets

Collecting datasets
  Downloading datasets-2.20.0-py3-none-any.whl (547 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/547.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.9/547.8 kB[0m [31m2.6 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m547.8/547.8 kB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
Collecting pyarrow>=15.0.0 (from datasets)
  Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl (39.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m39.9/39.9 MB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m14.7 MB/s[0m eta [36m0:00:00[0m
Collecting requests>=2.32.2 (from datasets)
  Downloading requests-2.32.3-py3-none-any.

In [2]:
# Loading the created dataset using datasets
from datasets import load_dataset
#from evaluate import load

data_files = {
    "train": "/content/Audio Mode Only/train.csv",
    "validation": "/content/Audio Mode Only/test.csv",
}

dataset = load_dataset("csv", data_files=data_files, delimiter="\t", )
train_dataset = dataset["train"]
eval_dataset = dataset["validation"]

print(train_dataset)
print(eval_dataset)

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Dataset({
    features: ['path', 'emotion'],
    num_rows: 986
})
Dataset({
    features: ['path', 'emotion'],
    num_rows: 247
})


In [3]:
# We need to specify the input and output column
input_column = "path"
output_column = "emotion"

In [4]:
# we need to distinguish the unique labels in our SER dataset
label_list = train_dataset.unique(output_column)
label_list.sort()  # Let's sort it for determinism
num_labels = len(label_list)
print(f"A classification problem with {num_labels} classes: {label_list}")

A classification problem with 6 classes: ['Anger', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad']


In [5]:
from transformers import AutoConfig, Wav2Vec2Processor,Wav2Vec2Model

In [6]:
model_name_or_path = "facebook/wav2vec2-large-960h-lv60-self"
pooling_mode = "mean"

In [7]:
# config
config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_labels=num_labels,
    label2id={label: i for i, label in enumerate(label_list)},
    id2label={i: label for i, label in enumerate(label_list)},
    finetuning_task="wav2vec2_clf",
    padding = True
)
setattr(config, 'pooling_mode', pooling_mode)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.61k [00:00<?, ?B/s]

In [8]:
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path,)
target_sampling_rate = processor.feature_extractor.sampling_rate
print(f"The target sampling rate: {target_sampling_rate}")

preprocessor_config.json:   0%|          | 0.00/158 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/162 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/291 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

The target sampling rate: 16000


# Preprocess Data

In [10]:
import torchaudio
def speech_file_to_array_fn(path):
    speech_array, sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
    speech = resampler(speech_array[0]).squeeze().numpy()
    return speech

def label_to_id(label, label_list):
    if len(label_list) > 0:
        return label_list.index(label) if label in label_list else -1
    return label

def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
    target_list = [label_to_id(label, label_list) for label in examples[output_column]]
    result = processor(speech_list, sampling_rate=target_sampling_rate, padding=True, return_tensors="pt")
    result["labels"] = list(target_list)

    return result

train_dataset = train_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)
eval_dataset = eval_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)

Map (num_proc=4):   0%|          | 0/986 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/247 [00:00<?, ? examples/s]

In [11]:
idx = 0
print(f"Training input_values: {train_dataset[idx]['input_values']}")
print(f"Training attention_mask: {train_dataset[idx]['attention_mask']}")
print(f"Training labels: {train_dataset[idx]['labels']} - {train_dataset[idx]['emotion']}")

Training input_values: [-0.0007983839022926986, -0.004181321710348129, -0.007581937592476606, -0.009738811291754246, -0.007958431728184223, -0.006933294702321291, -0.015285294502973557, -0.017825039103627205, -0.03451135382056236, -0.023403234779834747, -0.018646424636244774, -0.024064399302005768, -0.03563396632671356, -0.047948941588401794, -0.05608551949262619, -0.0520067922770977, -0.06188559532165527, -0.06086096912622452, -0.054427266120910645, -0.07779794186353683, -0.07774066925048828, -0.07283812016248703, -0.06949956715106964, -0.0665702298283577, -0.04894484207034111, -0.03657533973455429, -0.07895863801240921, -0.07946454733610153, -0.10482288897037506, -0.11547647416591644, -0.06797947734594345, -0.07945135980844498, -0.07593602687120438, -0.04507554695010185, -0.0704478994011879, -0.07734686136245728, -0.08908087015151978, -0.11744483560323715, -0.08884221315383911, -0.06934897601604462, -0.06612904369831085, -0.09775745123624802, -0.0984620526432991, -0.0984906479716301,

## Model

Before diving into the training part, we need to build our classification model based on the merge strategy.

In [12]:
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from transformers.file_utils import ModelOutput


@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None


In [13]:
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model
)


class Wav2Vec2ClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x


class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(
            self,
            hidden_states,
            mode="mean"
    ):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

        return outputs

    def forward(
            self,
            input_values,
            attention_mask=None,
            output_attentions=None,
            output_hidden_states=None,
            return_dict=None,
            labels=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


In [14]:
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import torch

import transformers
from transformers import Wav2Vec2Processor


@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [feature["labels"] for feature in features]

        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )

        batch["labels"] = torch.tensor(label_features, dtype=d_type)

        return batch

In [15]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

Next, the evaluation metric is defined. There are many pre-defined metrics for classification/regression problems, but in this case, we would continue with just **Accuracy** for classification and **MSE** for regression. You can define other metrics on your own.

In [16]:
is_regression = False

In [17]:
import numpy as np
from transformers import EvalPrediction


def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    preds = np.squeeze(preds) if is_regression else np.argmax(preds, axis=1)

    if is_regression:
        return {"mse": ((preds - p.label_ids) ** 2).mean().item()}
    else:
        return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}

Now, we can load the pretrained XLSR-Wav2Vec2 checkpoint into our classification model with a pooling strategy.

In [18]:
model = Wav2Vec2ForSpeechClassification.from_pretrained(
    model_name_or_path,
    config=config,
)

pytorch_model.bin:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

Some weights of Wav2Vec2ForSpeechClassification were not initialized from the model checkpoint at facebook/wav2vec2-large-960h-lv60-self and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


The first component of XLSR-Wav2Vec2 consists of a stack of CNN layers that are used to extract acoustically meaningful - but contextually independent - features from the raw speech signal. This part of the model has already been sufficiently trained during pretraining and as stated in the [paper](https://arxiv.org/pdf/2006.13979.pdf) does not need to be fine-tuned anymore.
Thus, we can set the `requires_grad` to `False` for all parameters of the *feature extraction* part.

In [None]:
#!pip install accelerate --upgrade --force-reinstall


In [19]:
model.freeze_feature_extractor()

In [20]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="/content/speech-emotion-recognition",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    eval_strategy="steps",
    num_train_epochs=1.0,
    fp16=False,
    save_steps=10,
    eval_steps=10,
    logging_steps=10,
    learning_rate=1e-4,
    save_total_limit=2,
)


For future use we can create our training script, we do it in a simple way. You can add more on you own.

In [22]:
from typing import Any, Dict, Union

import torch
from packaging import version
from torch import nn

from transformers import (
    Trainer,
    is_apex_available,
)

if is_apex_available():
    from apex import amp

if version.parse(torch.__version__) >= version.parse("1.6"):
    _is_native_amp_available = True
    from torch.cuda.amp import autocast


class CTCTrainer(Trainer):
    def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
        model.train()
        inputs = self._prepare_inputs(inputs)

        if self.use_amp:
            with autocast():
                loss = self.compute_loss(model, inputs)
        else:
            loss = self.compute_loss(model, inputs)

        if self.args.gradient_accumulation_steps > 1:
            loss = loss / self.args.gradient_accumulation_steps

        if self.use_amp:
            self.scaler.scale(loss).backward()
        elif self.use_apex:
            with amp.scale_loss(loss, self.optimizer) as scaled_loss:
                scaled_loss.backward()
        elif self.deepspeed:
            self.deepspeed.backward(loss)
        else:
            loss.backward()

        return loss.detach()


In [23]:
from transformers import Trainer
from typing import Any, Dict, Union
import torch
from torch import nn

class CTCTrainer(Trainer):
    def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
        model.train()
        inputs = self._prepare_inputs(inputs)

        loss = self.compute_loss(model, inputs)

        if self.args.gradient_accumulation_steps > 1:
            loss = loss / self.args.gradient_accumulation_steps

        loss.backward()

        return loss.detach()

Now, all instances can be passed to Trainer and we are ready to start training!

In [24]:
trainer = CTCTrainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=processor.feature_extractor,
)

### Training

Training will take between 10 and 60 minutes depending on the GPU allocated to this notebook.

In case you want to use this google colab to fine-tune your model, you should make sure that your training doesn't stop due to inactivity. A simple hack to prevent this is to paste the following code into the console of this tab (right mouse click -> inspect -> Console tab and insert code).

```javascript
function ConnectButton(){
    console.log("Connect pushed");
    document.querySelector("#top-toolbar > colab-connect-button").shadowRoot.querySelector("#connect").click()
}
setInterval(ConnectButton,60000);
```

In [25]:
trainer.train()

Step,Training Loss,Validation Loss,Accuracy
10,1.8176,1.803494,0.165992
20,1.7957,1.801236,0.165992
30,1.7949,1.820149,0.157895
40,1.8111,1.828474,0.161943
50,1.841,1.803328,0.174089
60,1.8156,1.79201,0.17004
70,1.8145,1.778564,0.263158
80,1.7704,1.7764,0.174089
90,1.7809,1.777816,0.174089
100,1.804,1.769888,0.174089


TrainOutput(global_step=123, training_loss=1.7962333593911273, metrics={'train_runtime': 1218.8926, 'train_samples_per_second': 0.809, 'train_steps_per_second': 0.101, 'total_flos': 1.50629086509312e+17, 'train_loss': 1.7962333593911273, 'epoch': 0.9959514170040485})

## Evaluation

In [26]:
import librosa
from sklearn.metrics import classification_report

In [28]:
test_dataset = load_dataset("csv", data_files={"test": "/content/Audio Mode Only/test.csv"}, delimiter="\t")["test"]
test_dataset

Generating test split: 0 examples [00:00, ? examples/s]

Dataset({
    features: ['path', 'emotion'],
    num_rows: 247
})

In [29]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

Device: cuda


In [44]:
from transformers import AutoConfig
model_name_or_path = "facebook/wav2vec2-large-960h-lv60-self"
pooling_mode = "mean"

checkpoint_dir = "/content/speech-emotion-recognition/checkpoint-123"

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Wav2Vec2ForSpeechClassification.from_pretrained(checkpoint_dir).to(device)
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path)


In [None]:

import torch
from huggingface_hub import notebook_login

model_name_or_path = "facebook/wav2vec2-large-960h-lv60-self"
checkpoint_dir = "/content/speech-emotion-recognition/checkpoint-123"

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Wav2Vec2ForSpeechClassification.from_pretrained(checkpoint_dir).to(device)
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path)

notebook_login()

repo_name = "FiendHunter/Wave2vec_pumave"

model.push_to_hub(repo_name)

processor.push_to_hub(repo_name)


VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

README.md:   0%|          | 0.00/24.0 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.27G [00:00<?, ?B/s]

In [None]:
'''import torchaudio
def speech_file_to_array_fn(path):
    speech_array, sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
    speech = resampler(speech_array[0]).squeeze().numpy()
    return speech

def label_to_id(label, label_list):
    if len(label_list) > 0:
        return label_list.index(label) if label in label_list else -1
    return label

def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
    target_list = [label_to_id(label, label_list) for label in examples[output_column]]
    result = processor(speech_list, sampling_rate=target_sampling_rate, padding=True, return_tensors="pt")
    result["labels"] = list(target_list)

    return result

train_dataset = train_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)
eval_dataset = eval_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)'''

In [52]:
def speech_file_to_array_fn(batch):
    speech_array, sampling_rate = torchaudio.load(batch["path"])
    speech_array = speech_array.squeeze().numpy()
    speech_array = librosa.resample(np.asarray(speech_array), orig_sr=sampling_rate, target_sr=processor.feature_extractor.sampling_rate)

    batch["speech"] = speech_array
    return batch

def predict(batch):
    features = processor(
        batch["speech"],
        sampling_rate=processor.feature_extractor.sampling_rate,
        return_tensors="pt",
        padding=True
    )

    input_values = features.input_values.to(device)
    attention_mask = features.attention_mask.to(device)

    with torch.no_grad():
        logits = model(input_values, attention_mask=attention_mask).logits

    pred_ids = torch.argmax(logits, dim=-1).detach().cpu().numpy()
    batch["predicted"] = pred_ids
    return batch

# Apply the preprocessing function
test_dataset = test_dataset.map(speech_file_to_array_fn, batched=False)

# Ensuring that each batch has the same length
def pad_sequences(batch):
    max_length = max(len(seq) for seq in batch["speech"])
    batch["speech"] = [np.pad(seq, (0, max_length - len(seq))) for seq in batch["speech"]]
    return batch

test_dataset = test_dataset.map(pad_sequences, batched=True, batch_size=8)

# Run the prediction
result = test_dataset.map(predict, batched=True, batch_size=8)

Map:   0%|          | 0/247 [00:00<?, ? examples/s]

Map:   0%|          | 0/247 [00:00<?, ? examples/s]

Map:   0%|          | 0/247 [00:00<?, ? examples/s]

ValueError: Unable to create tensor, you should probably activate padding with 'padding=True' to have batched tensors with the same length.

In [53]:
label_names = [config.id2label[i] for i in range(config.num_labels)]
label_names

AttributeError: 'str' object has no attribute 'num_labels'

In [54]:
y_true = [config.label2id[name] for name in result["emotion"]]
y_pred = result["predicted"]

print(y_true[:5])
print(y_pred[:5])

NameError: name 'result' is not defined

In [55]:
print(classification_report(y_true, y_pred, target_names=label_names))

NameError: name 'y_true' is not defined

# Prediction

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
from transformers import AutoConfig, Wav2Vec2Processor

import librosa
import IPython.display as ipd
import numpy as np
import pandas as pd

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name_or_path = "m3hrdadfi/wav2vec2-xlsr-greek-speech-emotion-recognition"
config = AutoConfig.from_pretrained(model_name_or_path)
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path)
sampling_rate = processor.feature_extractor.sampling_rate
model = Wav2Vec2ForSpeechClassification.from_pretrained(model_name_or_path).to(device)

In [None]:
def speech_file_to_array_fn(path, sampling_rate):
    speech_array, _sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(_sampling_rate)
    speech = resampler(speech_array).squeeze().numpy()
    return speech


def predict(path, sampling_rate):
    speech = speech_file_to_array_fn(path, sampling_rate)
    features = processor(speech, sampling_rate=sampling_rate, return_tensors="pt", padding=True)

    input_values = features.input_values.to(device)
    attention_mask = features.attention_mask.to(device)

    with torch.no_grad():
        logits = model(input_values, attention_mask=attention_mask).logits

    scores = F.softmax(logits, dim=1).detach().cpu().numpy()[0]
    outputs = [{"Emotion": config.id2label[i], "Score": f"{round(score * 100, 3):.1f}%"} for i, score in enumerate(scores)]
    return outputs


STYLES = """
<style>
div.display_data {
    margin: 0 auto;
    max-width: 500px;
}
table.xxx {
    margin: 50px !important;
    float: right !important;
    clear: both !important;
}
table.xxx td {
    min-width: 300px !important;
    text-align: center !important;
}
</style>
""".strip()

def prediction(df_row):
    path, emotion = df_row["path"], df_row["emotion"]
    df = pd.DataFrame([{"Emotion": emotion, "Sentence": "    "}])
    setup = {
        'border': 2,
        'show_dimensions': True,
        'justify': 'center',
        'classes': 'xxx',
        'escape': False,
    }
    ipd.display(ipd.HTML(STYLES + df.to_html(**setup) + "<br />"))
    speech, sr = torchaudio.load(path)
    speech = speech[0].numpy().squeeze()
    speech = librosa.resample(np.asarray(speech), sr, sampling_rate)
    ipd.display(ipd.Audio(data=np.asarray(speech), autoplay=True, rate=sampling_rate))

    outputs = predict(path, sampling_rate)
    r = pd.DataFrame(outputs)
    ipd.display(ipd.HTML(STYLES + r.to_html(**setup) + "<br />"))

In [None]:
test = pd.read_csv("/content/data/test.csv", sep="\t")
test.head()

In [None]:
prediction(test.iloc[0])

In [None]:
prediction(test.iloc[1])

In [None]:
prediction(test.iloc[2])