## Setup

Import necessary modules and dependencies. 

In [None]:
import os
import random
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras import models
from IPython import display

# Set the seed value for experiment reproducibility.
seed = 43
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)

In [None]:
print(tf.config.list_physical_devices())

<h1> Export file name

In [None]:
export_path = r'results/noise_factor_0.0001-0.002 OR shift_rnd data_fixed_100 (35-65_sec)/'
if not os.path.exists(export_path):
    os.makedirs(export_path)

## Import the dataset

The dataset's audio clips are stored in two folders corresponding to each label: `HT` and `WT`:

In [None]:
commands = np.array(tf.io.gfile.listdir(str('split_data_aug_fixed')))
commands = commands[commands != 'desktop.ini']
print('Commands:', commands)

Extract the audio clips into a list called `filenames`, and shuffle it.

data folder names:

split_data - all the short files combined with silence to a single file per session <br>
split_data_increased - all the short files combined with silence to a multiple files with a random length <br>
split_data_const - the combined files from split_data are split to a constant length (with or without residue) <br>



In [None]:
filenames = tf.io.gfile.glob(str('split_data_aug_fixed') + '/*/*')
filenames = tf.random.shuffle(filenames, seed=seed)
num_samples = len(filenames)
print('Number of total examples:', num_samples)

ht_count = len(tf.io.gfile.listdir(str('split_data_aug_fixed' + '/' + commands[0])))
print('Number of examples for HT label:', ht_count)
wt_count = len(tf.io.gfile.listdir(str('split_data_aug_fixed' + '/' + commands[1])))
print('Number of examples for WT label:', wt_count)

print('Example file tensor:', filenames[12])

In [None]:
from contextlib import redirect_stdout

with open(export_path + 'total_examples.txt', 'w') as f:
    with redirect_stdout(f):
        print(f"""
        Number of total examples: {num_samples}
        Number of examples for HT label: {ht_count}
        Number of examples for WT label: {wt_count}
        """)

Split `filenames` into training, validation and test sets using a 70:15:15 ratio, respectively:

In [None]:
train_size = int(0.7*num_samples)
val_size = int(0.85*num_samples)

In [None]:
train_files = filenames[:train_size]
val_files = filenames[train_size:val_size]
test_files = filenames[val_size:]

print('Training set size', len(train_files))
print('Validation set size', len(val_files))
print('Test set size', len(test_files))

In [None]:
from contextlib import redirect_stdout

with open(export_path + 'train_val_test.txt', 'w') as f:
    with redirect_stdout(f):
        print(f"""
        Training set size: {len(train_files)}
        Validation set size: {len(val_files)}
        Test set size: {len(test_files)}
        """)

## Read the audio files and their labels

In this section you will preprocess the dataset, creating decoded tensors for the waveforms and the corresponding labels. Note that:

- Each WAV file contains time-series data with a set number of samples per second.
- Each sample represents the <a href="https://en.wikipedia.org/wiki/Amplitude" class="external">amplitude</a> of the audio signal at that specific time.
- In a <a href="https://en.wikipedia.org/wiki/Audio_bit_depth" class="external">16-bit</a> system, like the WAV files in the mini Speech Commands dataset, the amplitude values range from -32,768 to 32,767.
- The <a href="https://en.wikipedia.org/wiki/Sampling_(signal_processing)#Audio_sampling" class="external">sample rate</a> for this dataset is 16kHz.

The shape of the tensor returned by `tf.audio.decode_wav` is `[samples, channels]`, where `channels` is `1` for mono or `2` for stereo. The mini Speech Commands dataset only contains mono recordings. 

In [None]:
# test_file = tf.io.read_file('split_data\\WT\\syllable6.wav')
# test_audio, _ = tf.audio.decode_wav(contents=test_file)
# test_audio.shape

Now, let's define a function that preprocesses the dataset's raw WAV audio files into audio tensors:

In [None]:
def decode_audio(audio_binary):
  # Decode WAV-encoded audio files to `float32` tensors, normalized
  # to the [-1.0, 1.0] range. Return `float32` audio and a sample rate.
  audio, _ = tf.audio.decode_wav(contents=audio_binary)
  # Since all the data is single channel (mono), drop the `channels`
  # axis from the array.
  return tf.squeeze(audio, axis=-1)

Define a function that creates labels using the parent directories for each file:

- Split the file paths into `tf.RaggedTensor`s (tensors with ragged dimensions—with slices that may have different lengths).

In [None]:
def get_label(file_path):
  parts = tf.strings.split(
      input=file_path,
      sep=os.path.sep)
  # Note: You'll use indexing here instead of tuple unpacking to enable this
  # to work in a TensorFlow graph.
  return parts[-2]

Define another helper function—`get_waveform_and_label`—that puts it all together:

- The input is the WAV audio filename.
- The output is a tuple containing the audio and label tensors ready for supervised learning.

In [None]:
def get_waveform_and_label(file_path):
  label = get_label(file_path)
  audio_binary = tf.io.read_file(file_path)
  waveform = decode_audio(audio_binary)
  return waveform, label

Build the training set to extract the audio-label pairs:

- Create a `tf.data.Dataset` with `Dataset.from_tensor_slices` and `Dataset.map`, using `get_waveform_and_label` defined earlier.

You'll build the validation and test sets using a similar procedure later on.

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

files_ds = tf.data.Dataset.from_tensor_slices(train_files)

waveform_ds = files_ds.map(
    map_func=get_waveform_and_label,
    num_parallel_calls=AUTOTUNE)

Let's plot a few audio waveforms:

In [None]:
# rows = 1
# cols = 1
# n = rows * cols
# fig, axes = plt.subplots(rows, cols, figsize=(10, 12))

# for i, (audio, label) in enumerate(waveform_ds.take(n)):
#   r = i // cols
#   c = i % cols
#   ax = axes[r][c]
#   ax.plot(audio.numpy())
#   ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
#   label = label.numpy().decode('utf-8')
#   ax.set_title(label)

# plt.show()

## Convert waveforms to spectrograms

The waveforms in the dataset are represented in the time domain. Next, you'll transform the waveforms from the time-domain signals into the time-frequency-domain signals by computing the <a href="https://en.wikipedia.org/wiki/Short-time_Fourier_transform" class="external">short-time Fourier transform (STFT)</a> to convert the waveforms to as <a href="https://en.wikipedia.org/wiki/Spectrogram" clas="external">spectrograms</a>, which show frequency changes over time and can be represented as 2D images. You will feed the spectrogram images into your neural network to train the model.

A Fourier transform (`tf.signal.fft`) converts a signal to its component frequencies, but loses all time information. In comparison, STFT (`tf.signal.stft`) splits the signal into windows of time and runs a Fourier transform on each window, preserving some time information, and returning a 2D tensor that you can run standard convolutions on.

Create a utility function for converting waveforms to spectrograms:

- The waveforms need to be of the same length, so that when you convert them to spectrograms, the results have similar dimensions. This can be done by simply zero-padding the audio clips that are shorter than one second (using `tf.zeros`).
- When calling `tf.signal.stft`, choose the `frame_length` and `frame_step` parameters such that the generated spectrogram "image" is almost square. For more information on the STFT parameters choice, refer to <a href="https://www.coursera.org/lecture/audio-signal-processing/stft-2-tjEQe" class="external">this Coursera video</a> on audio signal processing and STFT.
- The STFT produces an array of complex numbers representing magnitude and phase. However, in this tutorial you'll only use the magnitude, which you can derive by applying `tf.abs` on the output of `tf.signal.stft`.

In [None]:
def get_spectrogram(waveform):
  # Zero-padding for an audio waveform with less than 16,000 samples.
  input_len = 16000
  waveform = waveform[:input_len]
  zero_padding = tf.zeros(
      [16000] - tf.shape(waveform),
      dtype=tf.float32)
  # Cast the waveform tensors' dtype to float32.
  waveform = tf.cast(waveform, dtype=tf.float32)
  # Concatenate the waveform with `zero_padding`, which ensures all audio
  # clips are of the same length.
  equal_length = tf.concat([waveform, zero_padding], 0)
  # Convert the waveform to a spectrogram via a STFT.
  spectrogram = tf.signal.stft(
      equal_length, frame_length=255, frame_step=128)
  # Obtain the magnitude of the STFT.
  spectrogram = tf.abs(spectrogram)
  # Add a `channels` dimension, so that the spectrogram can be used
  # as image-like input data with convolution layers (which expect
  # shape (`batch_size`, `height`, `width`, `channels`).
  spectrogram = spectrogram[..., tf.newaxis]
  return spectrogram

Next, start exploring the data. Print the shapes of one example's tensorized waveform and the corresponding spectrogram, and play the original audio:

In [None]:
for waveform, label in waveform_ds.take(1):
  label = label.numpy().decode('utf-8')
  spectrogram = get_spectrogram(waveform)

print('Label:', label)
print('Waveform shape:', waveform.shape)
print('Spectrogram shape:', spectrogram.shape)
print('Audio playback')
display.display(display.Audio(waveform, rate=16000))

Now, define a function for displaying a spectrogram:

In [None]:
def plot_spectrogram(spectrogram, ax):
  if len(spectrogram.shape) > 2:
    assert len(spectrogram.shape) == 3
    spectrogram = np.squeeze(spectrogram, axis=-1)
  # Convert the frequencies to log scale and transpose, so that the time is
  # represented on the x-axis (columns).
  # Add an epsilon to avoid taking a log of zero.
  log_spec = np.log(spectrogram.T + np.finfo(float).eps)
  height = log_spec.shape[0]
  width = log_spec.shape[1]
  X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
  Y = range(height)
  ax.pcolormesh(X, Y, log_spec)

Plot the example's waveform over time and the corresponding spectrogram (frequencies over time):

In [None]:
fig, axes = plt.subplots(2, figsize=(12, 8))
timescale = np.arange(waveform.shape[0])
axes[0].plot(timescale, waveform.numpy())
axes[0].set_title('Waveform')
axes[0].set_xlim([0, 16000])

plot_spectrogram(spectrogram.numpy(), axes[1])
axes[1].set_title('Spectrogram')
plt.show()

Now, define a function that transforms the waveform dataset into spectrograms and their corresponding labels as integer IDs:

In [None]:
def get_spectrogram_and_label_id(audio, label):
  spectrogram = get_spectrogram(audio)
  label_id = tf.argmax(label == commands)
  return spectrogram, label_id

Map `get_spectrogram_and_label_id` across the dataset's elements with `Dataset.map`:

In [None]:
spectrogram_ds = waveform_ds.map(
  map_func=get_spectrogram_and_label_id,
  num_parallel_calls=AUTOTUNE)

Examine the spectrograms for different examples of the dataset:

In [None]:
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 10))

for i, (spectrogram, label_id) in enumerate(spectrogram_ds.take(n)):
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  plot_spectrogram(spectrogram.numpy(), ax)
  ax.set_title(commands[label_id.numpy()])
  ax.axis('off')

plt.show()

## Build and train the model

Repeat the training set preprocessing on the validation and test sets:

In [None]:
def preprocess_dataset(files):
  files_ds = tf.data.Dataset.from_tensor_slices(files)
  output_ds = files_ds.map(
      map_func=get_waveform_and_label,
      num_parallel_calls=AUTOTUNE)
  output_ds = output_ds.map(
      map_func=get_spectrogram_and_label_id,
      num_parallel_calls=AUTOTUNE)
  return output_ds

In [None]:
train_ds = spectrogram_ds
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)

Batch the training and validation sets for model training:

In [None]:
batch_size = 128
train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)

Add `Dataset.cache` and `Dataset.prefetch` operations to reduce read latency while training the model:

In [None]:
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)

For the model, you'll use a simple convolutional neural network (CNN), since you have transformed the audio files into spectrogram images.

Your `tf.keras.Sequential` model will use the following Keras preprocessing layers:

- `tf.keras.layers.Resizing`: to downsample the input to enable the model to train faster.
- `tf.keras.layers.Normalization`: to normalize each pixel in the image based on its mean and standard deviation.

For the `Normalization` layer, its `adapt` method would first need to be called on the training data in order to compute aggregate statistics (that is, the mean and the standard deviation).

In [None]:
for spectrogram, _ in spectrogram_ds.take(1):
  input_shape = spectrogram.shape
print('Input shape:', input_shape)
num_labels = len(commands)

In [None]:
# scheduler no. 1
from keras.callbacks import LearningRateScheduler

# This is a sample of a scheduler I used in the past
def lr_scheduler(epoch, lr):
    decay_rate = 0.85
    decay_step = 5
    if epoch % decay_step == 0 and epoch:
        return lr * pow(decay_rate, np.floor(epoch / decay_step))
    return lr

In [None]:
# scheduler no. 2
from keras.callbacks import LearningRateScheduler

def scheduler(epoch, lr):
  if epoch < 15:
    return lr
  else:
    return lr * tf.math.exp(-0.07)


In [None]:
# scheduler no. 3
from keras.callbacks import LearningRateScheduler

# This is a sample of a scheduler I used in the past
def new_lr_scheduler(epoch, lr):
    decay_rate = 0.5
    decay_step = 1
    if epoch % decay_step == 0 and epoch:
        return lr * tf.math.exp(-(decay_rate / epoch))
    return lr

In [None]:
total_count = ht_count + wt_count
weight_for_0 = (1 / ht_count)*(total_count)/2.0
weight_for_1 = (1 / wt_count)*(total_count)/2.0

print('Weight for class 0 (HT): {:.2f}'.format(weight_for_0))
print('Weight for class 1 (WT): {:.2f}'.format(weight_for_1))

In [None]:
num_labels = 2
input_shape = (124, 129)

model = models.Sequential(
    [
        layers.Input(shape=input_shape),
        layers.LSTM(4096, return_sequences=True),
        layers.TimeDistributed(layers.Dense(32, activation="relu")),
        layers.Dense(64, activation="relu"),
        layers.Flatten(),
        layers.Dropout(0.45),
        layers.Dense(64, activation="relu"),
        layers.Dense(32, activation="relu"),
        layers.Flatten(),
        layers.Dropout(0.45),
        layers.Dense(num_labels, activation="softmax"),
    ]
)

model.summary()

Configure the Keras model with the Adam optimizer and the cross-entropy loss:

<h1> Plot Model </h1>

In [None]:
# from keras.utils import plot_model
# plot_model(model)

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
    metrics=['accuracy'],
)

Train the model over 200 epochs for demonstration purposes:

In [None]:
EPOCHS = 100
CLASS_WEIGHT = {0: weight_for_0, 1: weight_for_1}
CALLBACKS = [LearningRateScheduler(scheduler, verbose=1), tf.keras.callbacks.EarlyStopping(monitor='val_loss', verbose=1, patience=60)]
#CALLBACKS = [LearningRateScheduler(lr_scheduler, verbose=1), tf.keras.callbacks.EarlyStopping(monitor='val_loss', verbose=1, patience=40)]
#CALLBACKS = [LearningRateScheduler(new_lr_scheduler, verbose=1), tf.keras.callbacks.EarlyStopping(monitor='val_loss', verbose=1, patience=40)]
#CALLBACKS = [tf.keras.callbacks.EarlyStopping(monitor='val_loss', verbose=1, patience=40)]


history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=CALLBACKS,
    class_weight=CLASS_WEIGHT,
)

Let's plot the training and validation loss curves to check how your model has improved during training:

In [None]:
# list all data in history
print(history.history.keys())
plt.style.use('default')

# summarize history for accuracy
plt.figure(facecolor='white')
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='lower right')
plt.savefig(export_path + 'accuracy.png')
plt.show()


# summarize history for loss
plt.figure(facecolor='white')
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper right')
plt.savefig(export_path + 'loss.png')
plt.show()


## Evaluate the model performance

Run the model on the test set and check the model's performance:

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)

In [None]:
y_pred = np.argmax(model.predict(test_audio), axis=1)
y_true = test_labels

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')

In [None]:
from contextlib import redirect_stdout

with open(export_path + 'test_accuracy.txt', 'w') as f:
    with redirect_stdout(f):
        print(f"""
        Test set accuracy: {test_acc:.0%}
        """)

### Display a confusion matrix

Use a <a href="https://developers.google.com/machine-learning/glossary#confusion-matrix" class="external">confusion matrix</a> to check how well the model did classifying each of the commands in the test set:


In [None]:
from sklearn.metrics import confusion_matrix

# Get the confusion matrix
cm  = confusion_matrix(y_true, y_pred)

HT_ROW_TOTAL = cm.flatten()[0] + cm.flatten()[1]
WT_ROM_TOTAL = cm.flatten()[2] + cm.flatten()[3]

row_percentages = [(cm.flatten()[0]/HT_ROW_TOTAL), cm.flatten()[1]/HT_ROW_TOTAL, cm.flatten()[2]/WT_ROM_TOTAL, cm.flatten()[3]/WT_ROM_TOTAL]
row_percentages = ["{0:.2%}".format(value) for value in row_percentages]
group_counts = ["({0:0.0f})".format(value) for value in cm.flatten()]

labels = [f"{v1} {v2}" for v1, v2 in zip(row_percentages, group_counts)]

labels = np.asarray(labels).reshape(2,2)
plt.figure(figsize=(10, 8), facecolor='white')
sns.set(font_scale=1.6)
ax = sns.heatmap(cm, annot=labels, fmt='', xticklabels=commands, yticklabels=commands)
ax.set_title('Confusion Matrix:\n\n')
ax.set_xlabel('\nPredicted Values')
ax.set_ylabel('Actual Values ')
## Display the visualization of the Confusion Matrix.
plt.savefig(export_path + 'confusion_matrix.png')
plt.show()


In [None]:
# prescision and recall calculation

from sklearn.metrics import classification_report

target_names = ['HT', 'WT']

print(classification_report(y_true, y_pred, target_names=target_names))

In [None]:
from contextlib import redirect_stdout

with open(export_path + 'classification_report.txt', 'w') as f:
    with redirect_stdout(f):
        print(classification_report(y_true, y_pred, target_names=target_names))

## Run inference on an audio file

Finally, verify the model's prediction output using an input audio file of someone saying "no". How well does your model perform?

In [None]:
import random
sample_file = filenames[random.randint(0,100)].numpy().decode("utf-8")
sample_file

In [None]:
sample_ds = preprocess_dataset([str(sample_file)])

for spectrogram, label in sample_ds.batch(1):
  prediction = model(spectrogram)
  plt.figure(facecolor='white')
  plt.bar(commands, tf.nn.softmax(prediction[0]))
  plt.title(f'Predictions for "{commands[label[0]]}"')
  plt.show()

As the output suggests, your model should have recognized the audio command as "no".

## Next steps

This tutorial demonstrated how to carry out simple audio classification/automatic speech recognition using a convolutional neural network with TensorFlow and Python. To learn more, consider the following resources:

- The [Sound classification with YAMNet](https://www.tensorflow.org/hub/tutorials/yamnet) tutorial shows how to use transfer learning for audio classification.
- The notebooks from <a href="https://www.kaggle.com/c/tensorflow-speech-recognition-challenge/overview" class="external">Kaggle's TensorFlow speech recognition challenge</a>.
- The 
<a href="https://codelabs.developers.google.com/codelabs/tensorflowjs-audio-codelab/index.html#0" class="external">TensorFlow.js - Audio recognition using transfer learning codelab</a> teaches how to build your own interactive web app for audio classification.
- <a href="https://arxiv.org/abs/1709.04396" class="external">A tutorial on deep learning for music information retrieval</a> (Choi et al., 2017) on arXiv.
- TensorFlow also has additional support for [audio data preparation and augmentation](https://www.tensorflow.org/io/tutorials/audio) to help with your own audio-based projects.
- Consider using the <a href="https://librosa.org/" class="external">librosa</a> library—a Python package for music and audio analysis.

In [None]:
import winsound

def make_noise():
  winsound.Beep(440, 180)
  winsound.Beep(550, 180)
  winsound.Beep(440, 180)
  winsound.Beep(550, 180)


make_noise()