In [1]:
import random
import logging # allows writing status messages to a file or output streams

import numpy as np 
import tensorflow as tf 
from tensorflow import keras
from tensorflow.keras import layers

# Only log error messages
tf.get_logger().setLevel(logging.ERROR)
# set random seed
tf.keras.utils.set_random_seed(42)


# tf.keras.utils.set_random_seed?

2022-11-26 10:40:29.310685: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# Variable definition
MAX_DURATION = 1 # duration of the input audio file we feed into wav2vec
SAMPLING_RATE = 16000 # no. of samples of audio recorded every second
BATCH_SIZE = 32 # batch size for training and evaluating the model
NUM_CLASSES = 10 # classes our dataset will have
HIDDEN_DIM = 768 # Dimension of our model output (768 is for wav2vec2-base)
MAX_SEQ_LENGTH = MAX_DURATION * SAMPLING_RATE # max length of the input audio file
# 1 x 16k
MAX_FRAMES = 49
MAX_EPOCHS = 2 # maximum number of training epochs


MODEL_CHECKPOINT = "facebook/wav2vec2-base-960h"

In [3]:
from datasets import load_dataset
speech_command_v1 = load_dataset("superb", "ks")

  from .autonotebook import tqdm as notebook_tqdm


KeyboardInterrupt: 

In [None]:
speech_command_v1

In [None]:
# split test dataset by label to handle both train/test: 65/35 train vs test
speech_command_v1 = speech_command_v1['train'].train_test_split(train_size=0.65, test_size=0.35, stratify_by_column='label')

# remove the unknown and silence classes on the train dataset -> filter method
speech_command_v1 = speech_command_v1.filter(
    lambda x: x["label"]
    != (
        speech_command_v1["train"].features["label"].names.index("_silence_")
        and speech_command_v1['train'].features["label"].names.index("_unknown_")
    )
)

# sample our train and test dataset splits to a multiple of the bathc size = 2 -> select method
speech_command_v1["train"] = speech_command_v1['train'].select(
    [i for i in range((len(speech_command_v1["train"]) // BATCH_SIZE) * BATCH_SIZE)]
)

speech_command_v1["test"] = speech_command_v1['test'].select(
    [i for i in range((len(speech_command_v1["test"]) // BATCH_SIZE) * BATCH_SIZE)]
)

In [None]:
speech_command_v1

In [None]:
labels = speech_command_v1["train"].features["label"].names
label2id, id2label = dict(), dict()

for i, label in enumerate(labels):
    label2id[label] = str(i)
    id2label[str(i)] = label

print(id2label)

Pre-processing

In [None]:
# Import the neccesary libs

from transformers import AutoFeatureExtractor

feature_extractor = AutoFeatureExtractor.from_pretrained(
    MODEL_CHECKPOINT, return_attention_mask=True
)

def preprocess_function(examples):
    audio_arrays = [x["array"] for x in examples["audio"]]
    inputs = feature_extractor(
        audio_arrays,
        sampling_rate = feature_extractor.sampling_rate,
        max_length = MAX_SEQ_LENGTH,
        truncation = True,
        padding=True,
    )
    return inputs

processed_speech_commands_v1 = speech_command_v1.map(
    preprocess_function, remove_columns=["audio", "file"], batched=True
) # drop the audio and file column, they are not neccessary while training

# Load the entire dataset and split as a dict of np arrays
train = processed_speech_commands_v1["train"].shuffle(seed=42).with_format("numpy")[:]
test = processed_speech_commands_v1["test"].shuffle(seed=42).with_format("numpy")[:]


Wav2Vec2.0 / Classification Head

In [None]:
from transformers import TFWav2Vec2Model
# Model Definition

def mean_pool(hidden_states, feature_lengths):
    attention_mask = tf.sequence_mask(
        feature_lengths, maxlen=MAX_FRAMES, dtype=tf.dtypes.int64
    )

    padding_mask = tf.cast(
        tf.reverse(tf.cumsum(tf.reverse(attention_mask, [-1]), -1), [-1]),
        dtype=tf.dtypes.bool,
    )

    hidden_states = tf.where(
        tf.broadcast_to(
            tf.expand_dims(~padding_mask, -1), (BATCH_SIZE, MAX_FRAMES, HIDDEN_DIM)
        ),
        0.0,
        hidden_states,
    ) # returns the indexes of non-zero elements

    pooled_state = tf.math.reduce_sum(hidden_states, axis=1)/ tf.reshape(
        tf.math.reduce_sum(tf.cast(padding_mask, dtype=tf.dtypes.float32), axis=1),
        [-1, 1],
    )
    return pooled_state
    # reduce_sum -> computes the sum of all elements across dimensions of a tensor.

class TFWav2Vec2ForAudioClassification(layers.Layer):
    """
    Combines the Encoder and Decoder into an end-to-end model for training
    """

    def __init__(self, model_checkpoint, num_classes):
        super(TFWav2Vec2ForAudioClassification, self).__init__()
        # Instantiate the Wav2vec2 model w/out the Classification-head
        self.Wav2vec2 = TFWav2Vec2Model.from_pretrained(
            model_checkpoint, apply_spec_augment=False, from_pt=True
        )
        self.pooling = layers.GlobalAveragePooling1D()
        # Drop-out layer before the classification head
        self.intermediate_layer_dropout = layers.Dropout(0.5)

        # Classification Head
        self.final_layer = layers.Dense(num_classes, activation="softmax")

    def call(self, input):
        # take the first output in the returned dictionary corresponding to 
        # the output of the last layer of the Wav2Vec2
        hidden_states = self.Wav2Vec2(inputs["input_values"])[0]

        # if attention_mask doesn't exist then mean-pool only unmasked output frames
        if tf.is_tensor(inputs("attention_mask")):
            # get the length of each audio input by summing up the attention mask
            audio_lengths = tf.cumsum(inputs["attention_mask"], -1)[:,-1]

            # get the no. of wav2vec2 output frames for each corresponding audio input 
            # length
            feature_lengths = self.wav2vec2.wav2vec2._get_feat_extract_output_lengths(
                audio_lengths
            )
            pooled_state = mean_pool(hidden_states, feature_lengths)
            # if attention mask does not exist, then mean-pool only all output frames
        else:
            pooled_state = self.pooling(hidden_states)

        intermediate_state = self.intermediate_layer_dropout(pooled_state)
        final_state = self.final_layer(intermediate_state)

        return final_state