# **Command Word Detection Using Speech-to-Text**
Automatic speech recognition (ASR) involves transcribing audio into text. ASR is a sequence-to-sequence problem where the audio signal is a sequence of feature vectors and the text is a sequence of characters, words, or tokens.

ASR shows promise as a flexible command word detection system. Many wake word detectors are optimized for a single word or phrase. In this notebook, we will implement a wake word detector based on the Transformer model detailed in [*Attention Is All You Need*](https://papers.nips.cc/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf).

Using a transformer model architecture allows us to identify specific commands in speech.

In [1]:
!pip install audiomentations

Collecting audiomentations
  Downloading https://files.pythonhosted.org/packages/fb/e1/3078fe444be2a100d804ee1296115367c27fa1dfa6298bf4155f77345822/audiomentations-0.16.0-py3-none-any.whl
Installing collected packages: audiomentations
Successfully installed audiomentations-0.16.0


In [2]:
import os
import pathlib
import random

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf

from glob import glob
from tensorflow import keras
from tensorflow.keras import layers
from IPython import display
from audiomentations import Compose, AddBackgroundNoise, AddGaussianNoise, AddShortNoises, Gain, PitchShift

In [3]:
# Set a random seed for reproducibility:
random_seed = 10
tf.random.set_seed(random_seed)
np.random.seed(random_seed)

## **1. Importing the Dataset**
In this section we'll import the [*LJSpeech dataset*](https://www.tensorflow.org/datasets/catalog/ljspeech). This dataset contains `.wav` audio files with sentences read from various books by a single speaker. The data was collected by LibriVox and is under public domain.

In [4]:
# Get the data:
keras.utils.get_file(
    os.path.join(os.getcwd(),"data.tar.gz"),
    "https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2",
    extract=True,
    archive_format="tar",
    cache_dir="."
)

saveto = "./datasets/LJSpeech-1.1"
wavs = glob("{}/**/*.wav".format(saveto), recursive=True)

id_to_text = {}
with open(os.path.join(saveto, "metadata.csv"), encoding="utf-8") as f:
  for line in f:
    id = line.strip().split("|")[0]
    text = line.strip().split("|")[2]
    id_to_text[id] = text

Downloading data from https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2


In [5]:
# Set up the default path for the speech commands data:
data_dir = pathlib.Path("data/mini_speech_commands")

# Check if the data exists locally:
if not os.path.exists(data_dir):
  # Get the files:
  tf.keras.utils.get_file(
      "mini_speech_commands.zip",
      origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
      extract=True,
      cache_dir=".",
      cache_subdir="data"
  )

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip


In [6]:
def get_data(wavs, id_to_text, max_length=50):
  data = []
  for w in wavs:
    id = w.split("/")[-1].split(".")[0]
    if len(id_to_text[id]) < max_length:
      data.append({"audio": w, "text": id_to_text[id]})

  return data

In [7]:
class VectorizeChar:
  def __init__(self, max_length=50):
    self.vocab = (
        ["-", "#", "<", ">"]
            + [chr(i + 96) for i in range(1, 27)]
            + [" ", ".", ",", "?"]
        )
    self.max_length = max_length
    self.char_to_idx = {}
    for j, ch in enumerate(self.vocab):
        self.char_to_idx[ch] = j
  
  def __call__(self, text):
    text = text.lower()
    text = text[: self.max_length - 2]
    text = "<" + text + ">"
    pad_length = self.max_length - len(text)
    return [self.char_to_idx.get(ch, 1) for ch in text] + [0] * pad_length

  def get_vocabulary(self):
    return self.vocab

We'll now check out the dataset to ensure the data loaded correctly.

In [8]:
max_target_length = 200
data = get_data(wavs, id_to_text, max_target_length)
vectorizer = VectorizeChar(max_target_length)
print("Vocab size: {}".format(len(vectorizer.get_vocabulary())))
print("Dataset size: {}".format(len(data)))

Vocab size: 34
Dataset size: 13100


## **2. Data Preparation & Analysis**
Our next step is to prepare speech data for analysis and model training. Audio files are read as binary. We need to convert each file to a numerical tensor.

We use the [`tf.audio.decode_wav`](https://www.tensorflow.org/api_docs/python/tf/audio/decode_wav) function to load the audio files as Tensors with a specified sampling rate.

The `.wav` files contain time series data with a specific number of samples per second. Each sample represents the audio signal amplitude at a given time. Files in the `LJSpeech` dataset are sampled at 16 bits and range from -32768 to 32767. The sampling rate is 16kHz.

Note that `tf.audio.decode_wav` will normalize values to a range of [-1.0, 1.0]. We apply various augmentations (e.g., gain, pitch shift) to the recorded `LJSpeech` data.

In [9]:
def create_text_dataset(data):
  texts = [_["text"] for _ in data]
  text_data = [vectorizer(t) for t in texts]
  text_data = tf.data.Dataset.from_tensor_slices(text_data)
  return text_data

In [10]:
def path_to_audio(path, sample_rate=16000, augmentation=False):
  audio = tf.io.read_file(path)
  audio, _ = tf.audio.decode_wav(audio, 1)
  audio = tf.squeeze(audio, axis=-1)
  stfts = tf.signal.stft(audio, frame_length=200, frame_step=80, fft_length=256)
  x = tf.math.pow(tf.abs(stfts), 0.5)
  means = tf.math.reduce_mean(x, 1, keepdims=True)
  stdvs = tf.math.reduce_std(x, 1, keepdims=True)
  x = (x - means) / stdvs
  audio_length = tf.shape(x)[0]
  pad_length = 2754
  paddings = tf.constant([[0, pad_length], [0, 0]])
  x = tf.pad(x, paddings, "CONSTANT")[:pad_length, :]
  if augmentation and x.shape[0] is not None:
    transformations = Compose([
        AddGaussianNoise(),
        Gain(),
        PitchShift()
    ])
    x = transformations(x, sample_rate=sample_rate)
  else:
    pass
  return x

In [11]:
def create_audio_dataset(data, augmentation=False):
  files = [_["audio"] for _ in data]
  audio_data = tf.data.Dataset.from_tensor_slices(files)
  audio_data = audio_data.map(
      lambda a: path_to_audio(a, augmentation=augmentation), num_parallel_calls=tf.data.experimental.AUTOTUNE
  )
  return audio_data

In [12]:
def create_tf_dataset(data, batch_size=4, augmentation=False):
  audio_data = create_audio_dataset(data, augmentation)
  text_data = create_text_dataset(data)
  data = tf.data.Dataset.zip((audio_data, text_data))
  data = data.map(lambda x, y: {"source": x, "target": y})
  data = data.batch(batch_size)
  data = data.prefetch(tf.data.experimental.AUTOTUNE)
  return data

In [13]:
split = int(len(data) * 0.99)
train_data = data[:split]
test_data = data[split:]
raw_train_dataset = create_tf_dataset(train_data, batch_size=64)
aug_train_dataset = create_tf_dataset(train_data, batch_size=64, augmentation=True)
train_dataset = raw_train_dataset.concatenate(aug_train_dataset)
test_dataset = create_tf_dataset(test_data, batch_size=4)

## **3. Model Definition**
Transformer models for ASR have been detailed in [*Attention Is All You Need*](https://papers.nips.cc/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf). These models map one sequence (e.g., audio samples) to another (e.g., characters in text).

Transformers consist of an Encoder layer and a Decoder layer. The Encoder maps an input sequence to a set of features. The Decoder maps features to an output sequence of interest. In our case, the Encoder will map audio sample data to features which will be mapped to characters.

### **3.1 Transformer Input Layer**
In this section we'll define input layers for the Transformer model. The character parsing will use a sum of positional and token embeddings. The audio parsing will apply convolutional layers to downsample the input signals.

In [14]:
class TokenEmbedding(layers.Layer):
  def __init__(self, vocab_size=1000, max_length=100, num_hid=64):
    super(TokenEmbedding, self).__init__()
    # Define the token embedding layer:
    self.token_embedding = tf.keras.layers.Embedding(vocab_size, num_hid)
    # Define the position embedding layer:
    self.position_embedding = tf.keras.layers.Embedding(max_length, num_hid)

  def call(self, x):
    # Get the input sequence length:
    max_length = tf.shape(x)[-1]
    # Generate a token embedding:
    x = self.token_embedding(x)
    # Get the positions in the sequence:
    p = tf.range(start=0, limit=max_length, delta=1)
    # Generate a position embedding:
    p = self.position_embedding(p)
    return x + p

In [15]:
class SpeechFeatureEmbedding(layers.Layer):
  def __init__(self, num_hid, max_length=100, kernel_size=11, strides=2):
    super(SpeechFeatureEmbedding, self).__init__()
    # Define the convolutional layers:
    self.conv1 = tf.keras.layers.Conv1D(num_hid, 
                                        kernel_size=kernel_size, 
                                        strides=strides, 
                                        padding="same", 
                                        activation="relu")
    self.conv2 = tf.keras.layers.Conv1D(num_hid, 
                                        kernel_size=kernel_size, 
                                        strides=strides, 
                                        padding="same", 
                                        activation="relu")
    self.conv3 = tf.keras.layers.Conv1D(num_hid, 
                                        kernel_size=kernel_size, 
                                        strides=strides, 
                                        padding="same", 
                                        activation="relu")
    # Define the positional embedding:
    self.position_embedding = layers.Embedding(max_length, num_hid)

  def call(self, x):
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.conv3(x)
    return x

### **3.2 Transformer Encoder Layer**
We'll now define the Encoder layer. The Encoder maps input sequences to low-level features.

In [16]:
class TransformerEncoder(layers.Layer):
  def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
    super(TransformerEncoder, self).__init__()
    # Define the attention layer:
    self.attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
    # Define the feed-forward layer:
    self.ffn = keras.Sequential(
        [
         layers.Dense(feed_forward_dim, activation="relu"),
         layers.Dense(embed_dim),
        ]
    )
    # Define the normalization layers:
    self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
    self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
    # Define the dropout layers:
    self.dropout1 = layers.Dropout(dropout_rate)
    self.dropout2 = layers.Dropout(dropout_rate)

  def call(self, inputs, training):
    # Get the output of the multi-head attention layer:
    attn_output = self.attn(inputs, inputs)
    attn_output = self.dropout1(attn_output, training=training)
    out1 = self.layernorm1(inputs + attn_output)
    # Get the output of the feed-forward layer:
    ffn_output = self.ffn(out1)
    ffn_output = self.dropout2(ffn_output, training=training)
    out2 = self.layernorm2(out1 + ffn_output)
    return out2

### **3.3 Transformer Decoder Layer**
The Decoder maps encoded features to an output sequence.

In [17]:
class TransformerDecoder(layers.Layer):
  def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
    super(TransformerDecoder, self).__init__()
    # Define the self attention layer:
    self.self_attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
    # Define the encoder attention layer:
    self.enc_attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
    # Define the feed-forward layer:
    self.ffn = keras.Sequential(
        [
         layers.Dense(feed_forward_dim, activation="relu"),
         layers.Dense(embed_dim),
        ]
    )
    # Define the normalization layers:
    self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
    self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
    self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
    # Define the dropout layers:
    self.self_dropout = layers.Dropout(0.5)
    self.enc_dropout = layers.Dropout(dropout_rate)
    self.ffn_dropout = layers.Dropout(dropout_rate)

  def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
    '''
    Masks the upper half of the dot product matrix when computing
    self-attention.

    This prevents the flow of information from future tokens to
    the current token.
    '''
    i = tf.range(n_dest)[:,None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    mask = tf.cast(m, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat(
        [tf.expand_dims(batch_size, axis=-1)
        ,tf.constant([1,1], dtype=tf.int32)]
        ,0
    )
    return tf.tile(mask, mult)

  def call(self, enc_out, target):
    # Shape the input sequence:
    input_shape = tf.shape(target)
    # Get the batch size:
    batch_size = input_shape[0]
    # Get the length of each sequence in the batch:
    seq_len = input_shape[1]
    # Generate the causal attention mask:
    causal_mask = self.causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
    # Apply the self-attention layer with the mask:
    target_attn = self.self_attn(target, target, attention_mask=causal_mask)
    # Normalize the self-attention output:
    target_norm = self.layernorm1(target + self.self_dropout(target_attn))
    # Apply the encoder attention layer:
    enc_out = self.enc_attn(target_norm, enc_out)
    # Normalize the encoder attention output:
    enc_norm = self.layernorm2(self.enc_dropout(enc_out) + target_norm)
    # Apply the feed-forward layer:
    ffn_out = self.ffn(enc_norm)
    # Normalize the feed-forward output:
    ffn_norm = self.layernorm3(enc_norm + self.ffn_dropout(ffn_out))
    return ffn_norm

### **3.4 Full Transformer Model**
We now have the components to assemble our Transformer model.

The model takes audio spectrograms as inputs and predicts a sequence of characters. During training, we give the decoder a left-shifted target character sequence. During inference, the decoder uses its own past predictions to predict the next token.

In [18]:
class Transformer(keras.Model):
  def __init__(self,
               num_hid=64,
               num_heads=2,
               num_feed_forward=128,
               src_max_length=100,
               tgt_max_length=100,
               num_enc_layers=4,
               num_dec_layers=1,
               num_classes=10,
               ):
    super(Transformer, self).__init__()
    self.loss_metric = keras.metrics.Mean(name="loss")
    self.num_enc_layers = num_enc_layers
    self.num_dec_layers = num_dec_layers
    self.tgt_max_length = tgt_max_length
    self.num_classes = num_classes
    self.enc_input = SpeechFeatureEmbedding(num_hid=num_hid, max_length=src_max_length)
    self.dec_input = TokenEmbedding(vocab_size=num_classes, max_length=tgt_max_length, num_hid=num_hid)
    self.encoder = keras.Sequential(
        [self.enc_input]
        + [TransformerEncoder(embed_dim=num_hid, 
                              num_heads=num_heads, 
                              feed_forward_dim=num_feed_forward) for _ in range(num_enc_layers)]
    )
    for j in range(num_dec_layers):
      setattr(self,
              f"dec_layer_{j}",
              TransformerDecoder(embed_dim=num_hid, 
                                 num_heads=num_heads, 
                                 feed_forward_dim=num_feed_forward),
      )
    self.classifier = layers.Dense(num_classes)

  def decode(self, enc_out, tgt):
    # Transform the input:
    y = self.dec_input(tgt)
    # Loop through the decoder layers:
    for j in range(self.num_dec_layers):
      # Get the decoder:
      dec = getattr(self,f"dec_layer_{j}")
      # Decode the data:
      y = dec(enc_out, y)
    return y

  def call(self, inp):
    # Get the source sequence in the input data:
    src = inp[0]
    # Get the target sequence in the input data:
    tgt = inp[1]
    # Encode the source data:
    x = self.encoder(src)
    # Decode the taret data:
    y = self.decode(x, tgt)
    # Apply the classifier:
    out = self.classifier(y)
    return out
  
  @property
  def metrics(self):
    return [self.loss_metric]
  
  def train_step(self, batch):
    src = batch["source"]
    tgt = batch["target"]
    dec_input = tgt[:,:-1]
    dec_target = tgt[:,1:]
    with tf.GradientTape() as tape:
      preds = self([src, dec_input])
      one_hot = tf.one_hot(dec_target, depth=self.num_classes)
      mask = tf.math.logical_not(tf.math.equal(dec_target,0))
      loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
    trainable_vars = self.trainable_variables
    gradients = tape.gradient(loss, trainable_vars)
    self.optimizer.apply_gradients(zip(gradients, trainable_vars))
    self.loss_metric.update_state(loss)
    return {"loss": self.loss_metric.result()}

  def test_step(self, batch):
    src = batch["source"]
    tgt = batch["target"]
    dec_input = tgt[:,:-1]
    dec_target = tgt[:,1:]
    preds = self([src,dec_input])
    one_hot = tf.one_hot(dec_target, depth=self.num_classes)
    mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
    loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
    self.loss_metric.update_state(loss)
    return {"loss": self.loss_metric.result()}

  def generate(self, source, target_start_token_idx):
    bs = tf.shape(source)[0]
    enc = self.encoder(source)
    dec_input = tf.ones((bs, 1), dtype=tf.int32) * target_start_token_idx
    dec_logits = []
    for j in range(self.tgt_max_length - 1):
      dec_out = self.decode(enc, dec_input)
      logits = self.classifier(dec_out)
      logits = tf.argmax(logits, axis=-1, output_type=tf.int32)
      last_logit = tf.expand_dims(logits[:,-1], axis=-1)
      dec_logits.append(last_logit)
      dec_input = tf.concat([dec_input, last_logit], axis=-1)
    return dec_input

## **4. Training the Model**
Our final step is to train the Transformer model. We define a [`Callback`](https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/Callback) class extension to display model results every few epochs. We also create a custom [`Learning Rate Schedule`](https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/schedules/LearningRateSchedule) to adjust the learning rate and mitigate overfitting.

The best model results are saved to a checkpoint directory. Results are checked after each training epoch.

In [19]:
class DisplayOutputs(keras.callbacks.Callback):
  def __init__(
      self, batch, idx_to_token, target_start_token_idx=27, target_end_token_idx=28
  ):
    self.batch = batch
    self.target_start_token_idx = target_start_token_idx
    self.target_end_token_idx = target_end_token_idx
    self.idx_to_token = idx_to_token

  def on_epoch_end(self, epoch, logs=None):
    if (1 + epoch) % 5 !=0:
      return
    else:
      source = self.batch["source"]
      target = self.batch["target"]
      batch_size = tf.shape(source)[0]
      preds = self.model.generate(source, self.target_start_token_idx)
      preds = preds.numpy()
      for j in range(batch_size):
        target_text = "".join([self.idx_to_token[t] for t in target[j,:]])
        pred = ""
        for idx in preds[j,:]:
          pred += self.idx_to_token[idx]
          if idx == self.target_end_token_idx:
            break
        print("Target:      {}".format(target_text.replace("-","")))
        print("Prediction:  {}\n".format(pred))

In [20]:
class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
  def __init__(
      self,
      init_lr=1e-5,
      lr_after_warmup=1e-3,
      final_lr=1e-5,
      warmup_epochs=15,
      decay_epochs=85,
      steps_per_epoch=203,
  ):
    super(CustomSchedule).__init__()
    self.init_lr = init_lr
    self.lr_after_warmup = lr_after_warmup
    self.final_lr = final_lr
    self.warmup_epochs = warmup_epochs
    self.decay_epochs = decay_epochs
    self.steps_per_epoch = steps_per_epoch

  def calculate_lr(self, epoch):
    warmup_lr = self.init_lr + ((self.lr_after_warmup - self.init_lr) / (self.warmup_epochs - 1)) * epoch
    decay_lr = tf.math.maximum(
        self.final_lr,
        self.lr_after_warmup - (epoch - self.warmup_epochs) * (self.lr_after_warmup - self.final_lr) / self.decay_epochs
    )
    return tf.math.minimum(warmup_lr, decay_lr)

  def __call__(self, step):
    epoch = step // self.steps_per_epoch
    return self.calculate_lr(epoch)

In [21]:
batch = next(iter(test_dataset))

# Set up the callback to display model predictions:
idx_to_char = vectorizer.get_vocabulary()
display_callback = DisplayOutputs(batch,
                                  idx_to_char,
                                  target_start_token_idx=2,
                                  target_end_token_idx=3)

# Set up the callback to save model checkpoints:
checkpoint_path = "./model/training/checkpoints"
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path,
                                                         monitor="val_loss",
                                                         save_freq="epoch",
                                                         mode="min",
                                                         save_weights_only=True,
                                                         save_best_only=True,
                                                         verbose=0)

# Define the transformer model:
model = Transformer(
    num_hid = 200,
    num_heads=2,
    num_feed_forward=400,
    tgt_max_length=max_target_length,
    num_enc_layers=4,
    num_dec_layers=1,
    num_classes=34
)

# Load the model weights:
if os.path.exists(checkpoint_path) and len(os.listdir(checkpoint_path)) > 0:
  model.load_weights(checkpoint_path)
else:
  pass

# Define the loss function:
loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True, label_smoothing=0.1)

# Define the learning rate schedule:
lr = CustomSchedule(
    init_lr = 5e-5,
    lr_after_warmup=1e-3,
    final_lr=1e-5,
    warmup_epochs=15,
    decay_epochs=135,
    steps_per_epoch=len(train_dataset)
)

# Define the model optimizer:
optimizer = keras.optimizers.Adam(lr)

# Compile the model:
model.compile(optimizer=optimizer, loss=loss)

# Train the model:
hist = model.fit(train_dataset, validation_data=test_dataset, callbacks=[display_callback, checkpoint_callback], epochs=150)

Epoch 1/150

KeyboardInterrupt: ignored