In [19]:
import os
import datetime
from pathlib import Path

import pandas as pd

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_io as tfio

import IPython.display as ipd

In [2]:
yamnet_model_handle = 'https://tfhub.dev/google/yamnet/1'
yamnet_model = hub.load(yamnet_model_handle)

In [3]:
label_classes = ['spoof', 'bonafide']
map_class_to_id = {'spoof':0, 'bonafide':1}

In [4]:
@tf.function
def load_wav_16k_mono(filename):
    """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """
    file_contents = tf.io.read_file(filename)
    wav, sample_rate = tf.audio.decode_wav(
          file_contents,
          desired_channels=1)
    wav = tf.squeeze(wav, axis=-1)
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
    return wav

In [5]:
def read_csv(path, base_data_path):
    pd_data = pd.read_csv(path)

    filtered_pd = pd_data[pd_data.category.isin(label_classes)]

    class_id = filtered_pd['category'].apply(lambda name: map_class_to_id[name])
    filtered_pd = filtered_pd.assign(target=class_id)

    full_path = filtered_pd['filename'].apply(lambda row: os.path.join(base_data_path, row))
    filtered_pd = filtered_pd.assign(filename=full_path)

    filenames = filtered_pd['filename']
    targets = filtered_pd['target']

    return tf.data.Dataset.from_tensor_slices((filenames, targets)), len(targets)

In [6]:
def load_wav_for_map(filename, label):
    return load_wav_16k_mono(filename), label, 1

In [7]:
# applies the embedding extraction model to a wav data
def extract_embedding(wav_data, label, fold):
    scores, embeddings, spectrogram = yamnet_model(wav_data)
    num_embeddings = tf.shape(embeddings)[0]
    return (embeddings,
            tf.repeat(label, num_embeddings),
            tf.repeat(fold, num_embeddings))

In [8]:
def load_ds(csv, folder):
    ds, ds_len = read_csv(csv, folder)
    ds = ds.map(load_wav_for_map)
    ds = ds.map(extract_embedding).unbatch()
    return ds.map(lambda embedding, label, fold: (embedding, label)).cache(), ds_len

# Run from here for training the model

In [9]:
cached_ds, ds_length = load_ds('train.csv', 'train')
train_size = int(ds_length * 0.8)

train_ds = cached_ds.take(train_size)
val_ds = cached_ds.skip(train_size)

train_ds = train_ds.cache().shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
val_ds = val_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)

Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089


Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089






In [49]:
test_ds, _ = load_ds('test.csv', 'test')
test_ds = test_ds.batch(32).prefetch(tf.data.AUTOTUNE)

In [11]:
feature_classifier = tf.keras.Sequential([
    tf.keras.layers.Input(shape=1024, dtype=tf.float32,
                          name='input_embedding'),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(len(label_classes))
], name='feature_classifier')

feature_classifier.summary()

Model: "feature_classifier"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 512)               524800    
                                                                 
 dense_1 (Dense)             (None, 2)                 1026      
                                                                 
Total params: 525,826
Trainable params: 525,826
Non-trainable params: 0
_________________________________________________________________


In [14]:
feature_classifier.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                 optimizer="adam",
                 metrics=['accuracy'])

log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y %m %d-%H %M %S")

checkpoint_filepath = "checkpoint/"

early_stopping_callback = tf.keras.callbacks.EarlyStopping(monitor='loss',
                                            patience=3,
                                            restore_best_weights=True)

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)

callbacks = [early_stopping_callback, tensorboard_callback, model_checkpoint_callback]

In [15]:
# if you want to load the weights from a checkpoint
if Path(checkpoint_filepath).exists():
    feature_classifier.load_weights(checkpoint_filepath)

history = feature_classifier.fit(train_ds,
                       epochs=50,
                       validation_data=val_ds,
                       callbacks=callbacks)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50


In [50]:
feature_classifier.load_weights(checkpoint_filepath)
loss, accuracy = feature_classifier.evaluate(test_ds)

print("Loss: ", loss)
print("Accuracy: ", accuracy)

Loss:  1.2470332384109497
Accuracy:  0.8721886873245239


# Start running from here for running the trained model

In [17]:
class ReduceMeanLayer(tf.keras.layers.Layer):
  def __init__(self, axis=0, **kwargs):
    super(ReduceMeanLayer, self).__init__(**kwargs)
    self.axis = axis

  def call(self, inp):
    return tf.math.reduce_mean(inp, axis=self.axis)

In [20]:
saved_model_path = 'model'

input_segment = tf.keras.layers.Input(shape=(), dtype=tf.float32, name='audio')
embedding_extraction_layer = hub.KerasLayer(yamnet_model_handle,
                                            trainable=False, name='yamnet')
_, embeddings_output, _ = embedding_extraction_layer(input_segment)
serving_outputs = feature_classifier(embeddings_output)
serving_outputs = ReduceMeanLayer(axis=0, name='classifier')(serving_outputs)
serving_model = tf.keras.Model(input_segment, serving_outputs)

if not Path(saved_model_path).exists():
    serving_model.save(saved_model_path, include_optimizer=False)





INFO:tensorflow:Assets written to: model\assets


INFO:tensorflow:Assets written to: model\assets


In [28]:
tf.keras.utils.plot_model(serving_model)

You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.


In [29]:
reloaded_model = tf.saved_model.load(saved_model_path)

Change the path to the wav file you want to test

In [51]:
wav_path = 'test/E_1000001.wav'

testing_wav_data = load_wav_16k_mono(wav_path)
ipd.Audio(testing_wav_data, rate=16000)

In [52]:
serving_results = reloaded_model.signatures['serving_default'](testing_wav_data)
real_or_fake = label_classes[tf.math.argmax(serving_results['classifier'])]
print(f'The input sound is: {real_or_fake}')


The input sound is: spoof
