Dataset we wanted was that contains audio files of multiple people saying the same phrase, labeled with pronunciation scores, but this type of dataset was hard to find. So we decided to manually label the pronunciation scores ourselves. In the real service cases, we assume that the manual labeling is done by experts. However, it is too exhaustive and almost impossible to manually label all the audio data. Hence, we used few-shot learning technique. The idea of few-shot learning is “learns to discriminate” through the training set, and when a query comes in, it tries to guess which of the support set it is similar to. In other words, it doesn’t solve the problem of which class the query image ‘belongs to’, but rather which class it is ‘similar to’.

In this stage, we first change our wav files into tensors. And before few shot learning, we manually labeled the sample data as 0,1,2 (higher means better pronunciation) for accuracy, completeness, fluency, and prosodic. We then used this data and few shot learning technique to label pronunciation scores for the entire dataset. If you wonder what each evaluation metric means, please refer below.

1. **accuracy:** the level of the learner pronounce each word in the utterance correctly
2. **completeness:** the percentage of the words that are actually pronounced
3. **fluency:** does the speaker pronounce smoothly and without unnecessary pauses?
4. **prosodic:** does the speaker pronounce in correct intonation, stable speaking speed and rythm?

# 1. Import pacakages

First import packages, and mount on google drive.

In [None]:
import os
import torch
import math
import warnings
import torch.nn as nn
import numpy as np
import pandas as pd
import torch.optim as optim
import torch.nn.functional as F
import soundfile as sf
from sklearn.model_selection import train_test_split
from transformers import Wav2Vec2Processor, Wav2Vec2Model
from tqdm import tqdm
from itertools import combinations, product

warnings.filterwarnings('ignore')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 2. Change wav to tensors and manually label samples

First, make sure your GPU is available. To practice our code, we strongly recommend you to use GPU.

In [None]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


Then load Wav2Vec2 model and processor. Wav2vec2 model will transform our wav files into tensors. Wav2Vec2 processor offers all functionalities of Wav2Vec2 feature extractor and Wav2Vec2 CTC tokenizer. We use this to utilize feature extractor functionality, which process inputs in the form of appropriate inputs to the model.

In [None]:
# Load wav2vec2.0 model and processor
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base").to(device)

preprocessor_config.json:   0%|          | 0.00/159 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/163 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.84k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/291 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/380M [00:00<?, ?B/s]

Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Then, we define a `wav2vec` function that uses a Wav2Vec2 model to convert the audio file into a tensor. The function first load the audio file from the specified path using the `sf.read` function from the `soundfile` library, and process the audio input using a pre-defined `processor`. Note that you have to resample your audio file into 16000 sampling rate for the proper use of the model, since Wav2Vec2 model is pre-trained by 16000Hz audio files. The processed input is returned as a PyTorch tensor and is stored in the variable `input_values`. Next, it moves the processed input data to specified device (GPU or CPU), and feed the `input_values` into the model and obtain the output features. The `last_hidden_state` attribute is accessed to retrieve the final hidden states of the model. Lastly it computes the mean along the obtained features, and returns `fixed_length_vector`.

In [None]:
def wav2vec(audio_path):
    # load audio file
    audio_input, _ = sf.read(audio_path)

    # prepare input data
    input_values = processor(audio_input, return_tensors="pt", sampling_rate=16000).input_values

    # move input data to GPU
    input_values = input_values.to(device)

    # predict by using wav2vec model
    with torch.no_grad():
        features = model(input_values).last_hidden_state

    # transform to fixed_length vector
    fixed_length_vector = torch.mean(features, dim=1)

    return fixed_length_vector

In the next step, we create a new data frame that will contain the file paths of each wav files and tensors. We first store the file paths fore each wav files.

In [None]:
# initialize new dataframe
df = pd.DataFrame(columns = ['file_path', 'output_path','vector', 'accuracy', 'completeness', 'fluency', 'prosodic'])

In [None]:
# load reference audio files
original_path = 'your_own_path/recordings/original'
original_paths = [os.path.join(original_path, f) for f in os.listdir(original_path)]
df['file_path'] = original_paths

Then, we create a reference dataset by converting each audio files into same-sized tensors. Note that depending on the shape of your data frame, you will need to change the indexes of the `df.iloc` and `df.iat` functions accordingly. Also, if you have a large audio file, it is recommended to use a try except statement since it can cause a ‘**CUDA out of memory**’ error.

In [None]:
# change the audio data to tensors
for i in tqdm(range(len(df))):
  try:
    y = wav2vec(df.iloc[i,0])
    df.iat[i,2] = y
  except Exception as e:
    print(i,e)

 98%|█████████▊| 1992/2036 [07:19<00:14,  3.07it/s]

1990 CUDA out of memory. Tried to allocate 11.42 GiB. GPU 0 has a total capacty of 15.77 GiB of which 2.05 GiB is free. Process 50015 has 13.71 GiB memory in use. Of the allocated memory 12.11 GiB is allocated by PyTorch, and 172.18 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF


100%|██████████| 2036/2036 [07:29<00:00,  4.53it/s]


Lastly, we remove the rows that have NaN value (data that failed to transformed into a tensor because of the lack of GPU memory)and save the data frame as a `audio_reference.pkl` file. At this stage, we listened to each sample speech files and manually labeled the pronunciation scores. Since this is a somewhat cumbersome process, you can simple use provided `audio_reference.pkl` file in the share folder.

In [None]:
df = df[df['vector'].notnull()]
df.reset_index(inplace = True, drop=True)
df

Unnamed: 0,file_path,output_path,vector,accuracy,completeness,fluency,prosodic
0,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.2533, device='cuda:0'), tensor(0.0...",,,,
1,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.0094, device='cuda:0'), tensor(-0....",,,,
2,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(0.0100, device='cuda:0'), tensor(-0.0...",,,,
3,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.0126, device='cuda:0'), tensor(-0....",,,,
4,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.1092, device='cuda:0'), tensor(-0....",,,,
...,...,...,...,...,...,...,...
2030,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.0628, device='cuda:0'), tensor(-0....",,,,
2031,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.0545, device='cuda:0'), tensor(0.0...",,,,
2032,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.0671, device='cuda:0'), tensor(-0....",,,,
2033,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.1040, device='cuda:0'), tensor(-0....",,,,


In [None]:
# after storing the data frame, we manually labeled sample wav files and updated the audio_reference.pkl file
df.to_pickle('your_own_path/audio_reference.pkl')

# 3. Few shot learning

Before few shot learning, we load the `audio_reference.pkl` file that contains all wav file paths, tensors, and some of the wav files are labeled.

In [None]:
# Note that you have to manually label your sample before this process
# audio_reference.pkl file contains manually labeled smaple scores at this moment
df = pd.read_pickle('your_own_path/audio_reference.pkl')

Then, we define `Audio_Encoder` class. We first define `Audio_Encoder` class based on the transformer encoder. This model first generate the tensor that filled with zero. And then, it copy each of the input data into this tensor so that the sequence lengths of the input data are all the same. Next, we generate a randomized tensor and add it to the front of the input data. After that, we once again generate a padding mask to mask out the empty parts of the sequence, and feed it into the Transformer encoder to get the output. Finally, we take the first vector and use it as a feature vector. This encoder embeds the input audio data to extract high-dimensional features.

In [None]:
# Define Encoder class that extract high dimensional features
class Audio_Encoder(nn.Module):
    def __init__(self, num_heads, num_layers):
        super().__init__()
        self.sentecne_level = nn.TransformerEncoder(nn.TransformerEncoderLayer(d_model=768, nhead=num_heads),num_layers=num_layers)

    def forward(self, batch):
        max_len = max([e.size(0) for e in batch])
        padded_embeddings = torch.zeros(len(batch), max_len, batch[0].size(1)).to(device)
        for i, emb in enumerate(batch):
            seq_len = emb.size(0)
            padded_embeddings[i, :seq_len, :] = emb
        random_tensor = torch.randn(padded_embeddings.size(0), 1, padded_embeddings.size(2)).to(device)
        batch_tensor = torch.cat((random_tensor, padded_embeddings), dim=1)
        batch_tensor = batch_tensor.permute(1 ,0 ,2).float()
        padding_mask = batch_tensor.sum(dim=-1).permute(1 ,0) == 0
        output_batch = self.sentecne_level(batch_tensor.float(), src_key_padding_mask=padding_mask)
        output_batch = output_batch.permute(1 ,0 ,2)
        feature_vecs = output_batch[:,0,:]
        return feature_vecs

Next, we define `few_shot_Model` class. Here we are defining an encoder, two fully connected layers (`self.fc1`, `self.fc2`), a ReLU activation function (`self.ac`), and a sigmoid function (`self.sigmoid`). `self.encoder` takes in the Audio Encoder instance we defined earlier. This model first embed the input data (`voice_pair`) through an encoder, and passes it through a fully connected layer and ReLU activation function. After that, it passes the data through a second fully connected layer, and computes the cosine similarity between the two voice samples. This similarity value becomes the final output of the model.  

In [None]:
# Define few_shot_Model class
class few_shot_Model(nn.Module):
    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder
        self.fc1 = nn.Linear(768,768)
        self.ac = nn.ReLU()
        self.fc2 = nn.Linear(768,256)
        self.sigmoid = nn.Sigmoid()

    def forward(self, voice_pair):
        voice_pair = self.encoder(voice_pair)
        voice_pair = self.fc1(voice_pair)
        voice_pair = self.ac(voice_pair)
        voice_pair = self.fc2(voice_pair)
        similarity = torch.cosine_similarity(voice_pair[0], voice_pair[1], dim=0)
        out = similarity
        return out

And then, we split the data frame that is manually labeled and not to `train` and `test` variable, respectively. Also we stored the indexes of the data labeled 0, 1, 2 in the variables `g1`, `g2`, `g3`, respectively. (In here we use accuracy case, but we executed the whole process for prosodic, completeness, and fluency as well)

In [None]:
# split train and test data
train = df[df['accuracy'].notnull()]
test = df[df['accuracy'].isnull()]

In [None]:
# check index of 0,1 and 2
g1 = train[train['accuracy']==0].index
g2 = train[train['accuracy']==1].index
g3 = train[train['accuracy']==2].index

And we set binary cross entropy as a loss function and Adam as an optimizer.

In [None]:
# instantiate encoder model and set hyper parameters
encoder = Audio_Encoder(1,1)
f_model = few_shot_Model(encoder).to(device)
criterion = nn.BCELoss().to(device)
optimizer = torch.optim.Adam(f_model.parameters(), lr=0.00001)

The code below implements the entire training process. In the first part (data with same classes), we use the `combinations` functions to generate data pairs within each class. Since these data pairs belong to the same class, the target is set to 1. For each data pair, the model calculates the output, compares it to the target, and calculate the loss. After adding up these losses, the gradient is calculated via `backward()`, and the parameters are updated by calling the `step()`, method of the `optimizer`. This process is performed independently for `g1`, `g2`, and `g3`. In the second part (data with different classes), we use the `product` function to create a data pair between different classes. Since these data pairs belong to different classes, the target value is set to 0. The rest of the process is same as the first part. However, this process is performed independently for `g1` / `g2`, `g2` / `g3` and `g3` / `g1`.

In [None]:
# data with same classes
for epoch in range(10) :
    epoch_loss = 0
    target = torch.tensor([1.0]).to(device)

    total_loss = 0
    optimizer.zero_grad()
    for i,j in combinations(g1,2) :
        f_model.train()
        data = df.iloc[[i,j],2]
        data = tuple(d.to(device) for d in data)
        output = f_model(data).unsqueeze(0)
        loss = criterion(output, target)
        total_loss += loss
    total_loss.backward()
    optimizer.step()
    epoch_loss += total_loss

    total_loss = 0
    optimizer.zero_grad()
    for i,j in combinations(g2,2) :
        f_model.train()
        data = df.iloc[[i,j],2]
        data = tuple(d.to(device) for d in data)
        output = f_model(data).unsqueeze(0)
        loss = criterion(output, target)
        total_loss += loss
    total_loss.backward()
    optimizer.step()
    epoch_loss += total_loss

    total_loss = 0
    optimizer.zero_grad()
    for i,j in combinations(g3,2) :
        f_model.train()
        data = df.iloc[[i,j],2]
        data = tuple(d.to(device) for d in data)
        output = f_model(data).unsqueeze(0)
        loss = criterion(output, target)
        total_loss += loss
    total_loss.backward()
    optimizer.step()
    epoch_loss += total_loss


# data with different classes
    target = torch.tensor([0.0]).to(device)

    total_loss = 0
    optimizer.zero_grad()
    for i,j in product(g1, g2) :
        f_model.train()
        data = df.iloc[[i,j],2]
        data = tuple(d.to(device) for d in data)
        output = f_model(data).unsqueeze(0)
        loss = criterion(output, target)
        total_loss += loss
    total_loss.backward()
    optimizer.step()
    epoch_loss += total_loss

    total_loss = 0
    optimizer.zero_grad()
    for i,j in product(g1, g3) :
        f_model.train()
        data = df.iloc[[i,j],2]
        data = tuple(d.to(device) for d in data)
        output = f_model(data).unsqueeze(0)
        loss = criterion(output, target)
        total_loss += loss
    total_loss.backward()
    optimizer.step()
    epoch_loss += total_loss

    total_loss = 0
    optimizer.zero_grad()
    for i,j in product(g2, g3) :
        f_model.train()
        data = df.iloc[[i,j],2]
        data = tuple(d.to(device) for d in data)
        output = f_model(data).unsqueeze(0)
        loss = criterion(output, target)
        total_loss += loss
    total_loss.backward()
    optimizer.step()
    epoch_loss += total_loss

    print('epoch',epoch+1,epoch_loss.item())

epoch 1 283.1549072265625
epoch 2 276.42547607421875
epoch 3 277.15472412109375
epoch 4 276.8977355957031
epoch 5 277.4761962890625
epoch 6 277.38604736328125
epoch 7 277.14813232421875
epoch 8 278.9856262207031
epoch 9 271.53338623046875
epoch 10 274.1083679199219


# 4. Labeling via few shot learned model

And the we store index of `test` (data that is not labeled yet) to `tests` variable.

In [None]:
# initiate test
tests = test.index

Finally, we change the model’s training mode to evaluation mode, and for each test data, we compute the similarity to each class and determine the final class. Variables `s1`, `s2` and `s3` store scores indicating how similar the test data is to classes `g1`, `g2` and `g3`, respectively. For instance, the similarity between each data in `g1` and test data is summed up and stored in `s1`. We apply a sigmoid function to the model’s output to convert the result to a value between 0 and 1.

In [None]:
for idx in tqdm(tests):
    f_model.eval()
    s1 = 0
    for i1 in g1 :
        data = df.iloc[[idx, i1], 2]
        data = tuple(d.to(device) for d in data)
        s1 += torch.sigmoid(f_model(data)).detach().item()

    s2 = 0
    for i1 in g2 :
        data = df.iloc[[idx, i1], 2]
        data = tuple(d.to(device) for d in data)
        s2 += torch.sigmoid(f_model(data)).detach().item()

    s3 = 0
    for i1 in g3 :
        data = df.iloc[[idx, i1], 2]
        data = tuple(d.to(device) for d in data)
        s3 += torch.sigmoid(f_model(data)).detach().item()

    ans = max([s1, s2, s3])

    if ans == s1 :
        df.iat[idx,-4] = 0
    elif ans == s2 :
        df.iat[idx,-4] = 1
    else :
        df.iat[idx,-4] = 2

100%|██████████| 2005/2005 [02:37<00:00, 12.73it/s]


In [None]:
print(df['accuracy'].value_counts())
df

1    708
0    679
2    648
Name: accuracy, dtype: int64


Unnamed: 0,file_path,output_path,vector,accuracy,completeness,fluency,prosodic
0,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.0298, device='cuda:0'), tensor(-0....",1,2,2,2
1,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.0472, device='cuda:0'), tensor(-0....",1,0,1,2
2,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.0402, device='cuda:0'), tensor(0.0...",0,0,0,2
3,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(0.1732, device='cuda:0'), tensor(-0.1...",2,0,1,2
4,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(0.0178, device='cuda:0'), tensor(-0.1...",1,2,1,1
...,...,...,...,...,...,...,...
2030,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(0.0493, device='cuda:0'), tensor(-0.1...",1,1,0,0
2031,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(0.1448, device='cuda:0'), tensor(-0.0...",0,0,2,1
2032,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(-0.0594, device='cuda:0'), tensor(-0....",2,0,0,2
2033,/content/drive/My Drive/03. AI/kaggle_archive2...,,"[[tensor(0.2739, device='cuda:0'), tensor(-0.0...",2,1,2,0



And we store the data frame for the later use.

In [None]:
df.to_pickle('your_own_path/audio_reference_scored.pkl')