In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
cd 'drive/MyDrive/ColabNotebooks/HDA'

/content/drive/MyDrive/ColabNotebooks/HDA


In [None]:
!pip install tensorflow-io
!pip install psutil
!pip install keras-tuner
!pip install einops

Collecting tensorflow-io
  Downloading tensorflow_io-0.36.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (49.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.4/49.4 MB[0m [31m16.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow-io
Successfully installed tensorflow-io-0.36.0


In [None]:
import threading
import psutil
import random
import time
import subprocess
import sys
import seaborn as sns
import pandas as pd
import os
import numpy as np
from config import PREPROCESSING_PATH ,DATASET_SPLIT_PATH
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
from scipy.io import wavfile
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from keras_tuner import BayesianOptimization, HyperModel
from einops.layers.tensorflow import Rearrange

from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix,accuracy_score

In [None]:
sys.path.append(PREPROCESSING_PATH)

In [None]:
import preprocessing_tf
import evaluation

In [None]:
random.seed(42)
tf.random.set_seed(42)

In [None]:

print("Number of GPU:", len(tf.config.list_physical_devices('GPU')))
print("Version:", tf.__version__)

Number of GPU: 0
Version: 2.15.0


# Dataset Loading

To improve training efficiency, the dataset is pre-loaded into runtime, avoiding slow read speeds from the drive. It's already divided into train, validation, and test folders.

In [None]:
%cd /content/drive/MyDrive/ColabNotebooks/HDA

/content/drive/MyDrive/ColabNotebooks/HDA


In [None]:
!unzip project_data_split.zip -d /content/data

In [None]:
DATASET_SPLIT_PATH = "/content/data/project_data_split"

# Create train and validation dataset

Construct dataframes that includes the file paths and the corresponding spoken command (label) for each audio sample. The dataset comprises audio samples of 25 keywords: `backward`, `down`, `eight`, `five`, `follow`, `forward`, `four`, `go`, `learn`, `left`, `nine`, `no`, `off`, `on`, `one`, `right`, `seven`, `six`, `stop`, `three`, `two`, `up`, `visual`, `yes`, `zero`. Additionally, it contains 10 words `bed`,`bird`,`cat`, `dog`,`happy`,`house`,`marvin`,`sheila`,`tree`,`wow` that the model should not recognize as keywords.

To facilitate model training and evaluation, the labels are appropriately mapped: labels corresponding to the 25 keywords are retained in their original form, signifying that these are the commands the model is expected to recognize. Conversely, the labels for the 10 non-keyword words are mapped to a single class named "unknown". This approach consolidates these distinct non-keyword labels into a single category, simplifying the model's task by reducing the classification scope to the keywords and an "unknown" class for any non-keyword utterances.

In [None]:
train_df = preprocessing_tf.get_file_list(os.path.join(DATASET_SPLIT_PATH,"train"))
val_df = preprocessing_tf.get_file_list(os.path.join(DATASET_SPLIT_PATH,"validation"))

In [None]:
train_df.head()

Unnamed: 0,filepath,label,mapped_label
0,/content/data/project_data_split/train/down/20...,down,down
1,/content/data/project_data_split/train/down/a2...,down,down
2,/content/data/project_data_split/train/down/1d...,down,down
3,/content/data/project_data_split/train/down/a8...,down,down
4,/content/data/project_data_split/train/down/3a...,down,down


In [None]:
file_paths = tf.constant(train_df['filepath'].values)
labels = tf.constant(train_df['mapped_label'].values)

In [None]:
# Create a StringLookup layer
#label_lookup = label_lookup = tf.keras.layers.StringLookup(num_oov_indices=0)
label_lookup = tf.keras.layers.StringLookup(num_oov_indices=0)
label_lookup.adapt(labels)
# Transform labels into numeric
numeric_labels = label_lookup(labels)

# Create a TensorFlow dataset
train_dataset = tf.data.Dataset.from_tensor_slices((file_paths, numeric_labels))

In [None]:
file_paths_val = tf.constant(val_df['filepath'].values)
labels_val = tf.constant(val_df['mapped_label'].values)
numeric_labels_val = label_lookup(labels_val)
validation_dataset = tf.data.Dataset.from_tensor_slices((file_paths_val, numeric_labels_val))

In [None]:
numeric_labels_val

<tf.Tensor: shape=(9981,), dtype=int64, numpy=array([7, 7, 7, ..., 1, 1, 1])>

# Preprocessing the Datasets

The training and validation datasets undergo preprocessing through our established pipeline. For the baseline model, the preprocessing involves only two steps: padding the data to ensure uniformity in size, which is essential for the model's input requirements, and converting the audio files into spectrograms.

In [None]:
train_spectrogram_ds = train_dataset.map(lambda fp, lbl: preprocessing_tf.preprocess_map_new(fp, lbl,noise=True, mfcc=True),
                               num_parallel_calls=tf.data.AUTOTUNE)
train_spectrogram_ds = train_spectrogram_ds.cache().shuffle(10000).prefetch(tf.data.AUTOTUNE)

val_spectrogram_ds = validation_dataset.map(lambda fp, lbl: preprocessing_tf.preprocess_map_new(fp, lbl,noise=True, mfcc=True),
                               num_parallel_calls=tf.data.AUTOTUNE)

val_spectrogram_ds= val_spectrogram_ds.cache().prefetch(tf.data.AUTOTUNE)

Padding shape: (16000,)
/content/data/project_data_split/_background_noise_/doing_the_dishes.wav
Noisy shape: (16000,)
log_mel_spectrogram shape: (98, 40)
Mfcc shape: (98, 40, 1)
Padding shape: (16000,)
/content/data/project_data_split/_background_noise_/exercise_bike.wav
Noisy shape: (16000,)
log_mel_spectrogram shape: (98, 40)
Mfcc shape: (98, 40, 1)


In [None]:
batch_size = 32
train_spectrogram_ds = train_spectrogram_ds.batch(batch_size)
val_spectrogram_ds = val_spectrogram_ds.batch(batch_size)

In [None]:
input_shape =train_spectrogram_ds.element_spec[0].shape[1:]
print('Input shape:', input_shape)
num_labels = len(label_lookup.get_vocabulary())
print('Number of labels:', num_labels)

Input shape: (98, 40, 1)
Number of labels: 26


# Attention Model

In [None]:
class PreNorm(tf.keras.layers.Layer):
    def __init__(self, dim, fn, epsilon=1e-6):
        super().__init__()
        self.norm = layers.LayerNormalization(epsilon=epsilon)
        self.fn = fn

    def call(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)

class FeedForward(tf.keras.layers.Layer):
    def __init__(self, dim, hidden_dim, dropout=0.):
        super().__init__()
        self.net = tf.keras.Sequential([
            layers.Dense(hidden_dim, activation='gelu'),
            layers.Dropout(dropout),
            layers.Dense(dim),
            layers.Dropout(dropout)
        ])

    def call(self, x):
        return self.net(x)

class Attention(tf.keras.layers.Layer):
    def __init__(self, dim, heads=8, dim_head=64, dropout=0.):
        super().__init__()
        inner_dim = dim_head * heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5

        self.attend = layers.Softmax(axis=-1)
        self.to_qkv = layers.Dense(inner_dim * 3, use_bias=False)

        self.to_out = tf.keras.Sequential([
            layers.Dense(dim),
            layers.Dropout(dropout)
        ]) if project_out else tf.identity()

    def call(self, x):
        b, n, _, h = x.shape[0], x.shape[1], x.shape[2], self.heads
        qkv = tf.split(self.to_qkv(x), 3, axis=-1)
        q, k, v = [tf.reshape(t, (b, n, h, -1)) for t in qkv]

        dots = tf.einsum('bhqd, bhkd -> bhqk', q, k) * self.scale

        attn = self.attend(dots)

        out = tf.einsum('bhqk, bhvd -> bhqd', attn, v)
        out = tf.reshape(out, (b, n, -1))
        return self.to_out(out)

class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, dim, heads, dim_head, mlp_dim, dropout=0.):
        super().__init__()
        self.attn = PreNorm(dim, Attention(dim, heads=heads, dim_head=dim_head, dropout=dropout))
        self.ff = PreNorm(dim, FeedForward(dim, mlp_dim, dropout=dropout))

    def call(self, x):
        x = self.attn(x) + x
        x = self.ff(x) + x
        return x

In [None]:
class ViT(tf.keras.Model):
    def __init__(self, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, pool='cls', channels=1, dim_head=64, dropout=0., emb_dropout=0.1):
        super().__init__()
        num_patches = (image_size[0] // patch_size[0]) * (image_size[1] // patch_size[1])
        patch_dim = channels * patch_size[0] * patch_size[1]

        self.to_patch_embedding = tf.keras.Sequential([
            Rearrange('b (h p1) (w p2) c -> b (h w) (p1 p2 c)', p1=patch_size[0], p2=patch_size[1]),
            layers.Dense(dim),
        ])

        self.pos_embedding = tf.Variable(tf.random.normal((1, num_patches + 1, dim)))
        # self.cls_token = tf.Variable(tf.random.normal((1, 1, dim)))
        self.cls_token = self.add_weight(shape=(1, 1, dim), initializer='random_normal', trainable=True)

        self.dropout = layers.Dropout(emb_dropout)

        self.transformer = tf.keras.Sequential([
            TransformerBlock(dim, heads, dim_head, mlp_dim, dropout) for _ in range(depth)
        ])

        self.pool = pool
        self.to_latent = tf.identity

        self.mlp_head = tf.keras.Sequential([
            layers.LayerNormalization(epsilon=1e-6),
            layers.Dense(num_classes)
        ])

    def call(self, img):
        x = self.to_patch_embedding(img)
        b, n, _ = x.shape
        cls_tokens = tf.tile(self.cls_token, [b, 1, 1])
        x = tf.concat([cls_tokens, x], axis=1)
        x += self.pos_embedding[:, :(n + 1)]
        x = self.dropout(x)

        x = self.transformer(x)

        if self.pool == 'mean':
            x = tf.reduce_mean(x, axis=1)
        else:  # 'cls'
            x = x[:, 0]

        x = self.to_latent(x)
        return self.mlp_head(x)

In [None]:
class ViTHyperModel(HyperModel):
    def __init__(self, shape, num_classes):
        self.shape = shape
        self.num_classes = num_classes

    def build(self, hp):
        model = ViT(
            image_size=self.shape,
            patch_size=(2, 5),
            num_classes=self.num_classes,
            dim=hp.Int('dim', min_value=256, max_value=1024, step=256),
            depth=hp.Int('depth', min_value=3, max_value=6, step=1),
            heads=hp.Choice('heads', values=[8, 12, 16]),
            mlp_dim=hp.Choice('mlp_dim', values=[512, 1024, 2048]),
            dropout=hp.Float('dropout', min_value=0.1, max_value=0.5, step=0.1),
            emb_dropout=hp.Float('emb_dropout', min_value=0.1, max_value=0.5, step=0.1)
        )
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
            loss=tf.keras.losses.SparseCategoricalCrossentropy(),
            metrics=['accuracy']
        )
        return model

In [None]:
input_shape

TensorShape([98, 40, 1])

In [None]:
hypermodel = ViTHyperModel(shape=input_shape, num_classes=num_labels)

In [None]:
# Define the BayesianOptimization tuner
tuner = BayesianOptimization(
    hypermodel,
    objective='val_accuracy',
    max_trials=20,
    num_initial_points=2,
    seed=42,
    project_name='ViT_BayesianOpt'
)

Reloading Tuner from ./ViT_BayesianOpt/tuner0.json


In [None]:
tuner.search(train_spectrogram_ds,
             validation_data=val_spectrogram_ds,
             epochs=20,
             verbose=1)

best_model = tuner.get_best_models(1)[0]
best_model.summary()

best_model.fit(
    train_spectrogram_ds,
    validation_data=val_spectrogram_ds,
    epochs=100,
    verbose=1
)


best_model.save('ViT_retrained')

#References

**[Warden18]**

Keyword transformer: a self-attention model for keyword spotting