<a href="https://colab.research.google.com/github/keysmusician/MockingBot/blob/main/MockingBot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#MockingBot
This is my portfolio project for Holberton School.

MockingBot is a generative model which aims to generate original audio similar to but not exactly the same as the training data.

In [None]:
'''
Prepares dependencies.

Note: If the runtime disconnects, reconnecting may not cause dependencies to 
load correctly. In that event, restart the runtime, then retry execution.
'''
# Install Tensorflow I/O so 24-bit WAV files can be opened. For some reason,
# the preinstalled TensorFlow and Tensorflow I/O have to be removed for 
# libtensorflow_io.so to work.
!pip uninstall tensorflow -yq # -yq: yes, quiet
!pip uninstall tensorflow-io -yq
!pip install tensorflow-gpu -q
!pip install --no-deps tensorflow-io -q

[K     |████████████████████████████████| 511.7 MB 5.8 kB/s 
[K     |████████████████████████████████| 5.8 MB 47.4 MB/s 
[K     |████████████████████████████████| 438 kB 72.6 MB/s 
[K     |████████████████████████████████| 1.6 MB 48.3 MB/s 
[K     |████████████████████████████████| 25.9 MB 53.9 MB/s 
[?25h

# Datasets

This section prepares the various datasets I will be using.

I currently have two datasets available:
- Kicks: 856 24-bit ?-kHz stereo kick drum samples
- Meows: 440 16-bit 8-kHz mono cat meows from 2 breeds in 3 situation categories (grooming, feeding, exploring). Source: https://zenodo.org/record/4008297#.YwC5xPHMJAd

TODO:
I need data from a wider range of sources. I currently have about 900 kick drum
samples. I may want datasets of:
- Cricket chirps
- A bird call (how appropriate)
- Katydid songs
- Cow moos
- Bullfrog croaks
- Any short and simple animal call, etc.

I think three sources should suffice


In [None]:
'''
Makes datasets available.
'''
from google.colab import drive
import os


drive.mount('/content/drive/')

DATASETS_PATH = '/content/drive/Shareddrives/MockingBot/'

# Show available datasets:
print('Datasets:')

for dataset_name in os.listdir(DATASETS_PATH):
    print(dataset_name)


# Uncomment the following if you'd like to upload a dataset manually.
# You'll have to update the dataset path to point to your custom location:

# from google.colab import files
# files.upload()

Mounted at /content/drive/
Datasets:
Kicks
Meows
Output
.ipynb_checkpoints
Models


In [None]:
'''
Sets the project's dataset.
'''
# Choose the desired dataset here.
# `DATASET_NAME` can be the name of any folder inside `DATASETS_PATH`:
# NOTE: "Kicks" actually does not work because the WAV files can be saved in
# various structures, not all of which are supported by TensorFlow. The files
# in "Kicks" are not all in a consistent compatible structure.
DATASET_NAME = 'Meows'

DATASET_PATH = f'{DATASETS_PATH}{DATASET_NAME}/'
'The path to the dataset.'

'The path to the dataset.'

In [None]:
'''
Converts WAV files to be compatible with TensorFlow.

This cell is very incomplete.
'''

# file_count = len(training_filenames)
# for i, filename in enumerate(training_filenames[:100]):
#   print(f'{i}/{file_count} ({i / file_count * 100:.3}%)')
#   try:
#     test_audio_tensor = tfio.audio.decode_wav(
#       input=tf.io.read_file(DATASET_PATH + filename),
#       dtype=tf.int16
#     )
#     print(f'PASS: {filename}')
#   except:
#     try:
#       test_audio_tensor = tfio.audio.decode_wav(
#         input=tf.io.read_file(DATASET_PATH + filename),
#         dtype=tf.int32
#       )
#       print(f'PASS: {filename}')
#     except:
#       print(f'FAIL:{filename}')
#       
# with wave.open(DATASET_PATH + 'SD_Kick_Jelly.wav', 'wb') as wav:
#   wav.setnchannels(2)
#   wav.setsampwidth(2)
#   wav.setframerate(44100)
#   # bit_depth = wav.getsampwidth() * 8
#   # sample_rate = wav.getframerate()
# 
# print(bit_depth, sample_rate)
# 
# tfio.audio.decode_wav(
#       input=tf.io.read_file(DATASET_PATH + 'SD_Kick_Jelly.wav'),
#       dtype=tf.int16
# )

'\nConverts WAV files to be compatible with TensorFlow.\n\nThis cell is very incomplete.\n'

In [None]:
'''
Loads and plots a random WAV file from the dataset.
'''
import IPython
import matplotlib.pyplot as plt
import random
import tensorflow as tf
import tensorflow_io as tfio
import wave


filenames = [
    filename for filename in os.listdir(DATASET_PATH)
    if filename.endswith('.wav')
]

test_filename = random.choice(filenames)

print('File name:', test_filename)

test_file_path = DATASET_PATH + test_filename

with wave.open(test_file_path) as wav:
    bit_depth = wav.getsampwidth() * 8
    sample_rate = wav.getframerate()

print('Bit depth:', bit_depth)

print('Sample rate:', sample_rate)

test_audio_tensor = tfio.audio.decode_wav(
    input=tf.io.read_file(test_file_path),
    dtype=tf.int16 if bit_depth == 16 else tf.int32  # If this line errors, you may need to restart the runtime
)

print('Tensor dimensions:', test_audio_tensor.shape)

plt.plot(test_audio_tensor)

IPython.display.display(IPython.display.Audio(test_file_path, rate=sample_rate))

ModuleNotFoundError: ignored

#Input Pipeline
The input pipeline for this project flows as follows:
1. Construction. Audio tensors are constructed from each WAV file specified.
2. Mono conversion. Audio tensors are converted to mono. Channel 1 (right?) is dropped if it exists. The resulting tensor has one axis of variable length.
3. Amplitude normalization. Signals are amplitude normalized to fully utilize the amplitude space.
4. Min-max scaling. Tensors are scaled between -1 and 1.
5. Short-time Fourier Transform. Frequency information is extracted using STFT.

If I want to simplify the problem, I can:
- Pad tensors to equal length
- Downsample

In [None]:
'''
Builds and preprocesses the dataset.
'''


def load_wav(file_path):
    '''
    Loads a WAV file as a tensor.

    Stereo files will be flattened to be mono by taking channel 0.

    file_path: The path of a WAV file.

    Returns: Variable length `tf.Tensor`.
    '''
    # TFIO decoding
    # audio = tfio.audio.decode_wav(
    #   input=tf.io.read_file(file_path),
    #   dtype=tf.int16 if bit_depth == 16 else tf.int32
    # )
    #
    # Flatten to mono if neccessary and remove the channel axis
    # return audio[:, 0]

    # TF decoding
    audio, _ = tf.audio.decode_wav(
        contents=tf.io.read_file(file_path),
        desired_channels=1,
        desired_samples=13_000
    )

    return tf.squeeze(audio)[2_000:]


def normalize(audio_tensor):
    '''
    Normalizes an audio signal.

    Scales an audio signal to entirely fill the range -1 to 1.

    audio_tensor: A tensor of audio data.

    Returns: `tf.float32` Normalized audio tensor.
    '''
    data_type_max = audio_tensor.dtype.max

    tensor_max = tf.reduce_max(tf.abs(audio_tensor))

    # Ensure `tensor_max` is non-zero to avoid arithmetic error
    scaling_factor = tf.cast(data_type_max / tensor_max, tf.float32)\
        if tensor_max != 0 else 1.0

    return tf.cast(audio_tensor, tf.float32) * scaling_factor / data_type_max


def input_pipeline(file_path):
    '''
    Performs dataset processing.

    file_path: A string tensor containing the name of a WAV file in the dataset.

    Returns: A 2D tensor of audio features (see STFT).
    '''
    spectrogram_vector = tf.reshape(
        tf.abs(
            tf.signal.stft(
                # TF decoding does normalization (TFIO does not):
                # normalize(load_wav(file_path)), 
                load_wav(file_path),
                frame_length=2048,
                frame_step=50
            )
        ),
        [-1]  # Flatten the spectrogram
    )

    max = tf.reduce_max(spectrogram_vector)

    return spectrogram_vector / max


# Try tf...Dataset.list_files() instead
training_dataset = tf.data.Dataset.list_files(
    file_pattern=DATASET_PATH + '*.wav',
    shuffle=True,
    seed=0
).map(
    map_func=input_pipeline,
    num_parallel_calls=tf.data.AUTOTUNE
).filter(
    lambda training_example:
      (not tf.math.reduce_any(tf.experimental.numpy.isnan(training_example))) 
      and
      (not tf.math.reduce_any(tf.experimental.numpy.isinf(training_example)))
)

# Test the dataset:
for training_example in training_dataset:
    # Confirm there are no NaN's or inf's in the dataset:
    tf.debugging.assert_all_finite(training_example, str(training_example))

    # Confirm the values are normalized
    if not tf.experimental.numpy.isclose(
          tf.reduce_max(training_example).numpy(), 1):
      raise ValueError(f'Tensor is not normalized: {training_example}')

    

In [None]:
''' 
Creates a mock dataset of sin waves at various frequencies.
'''
import numpy as np



def generate_mock_dataset():
    training_example_count = 60_000

    tau = np.pi * 2

    time_steps = np.arange(0, 11_000)

    sample_rate_Hz = 8_000

    max_frequency_Hz = sample_rate_Hz / 2

    min_frequency_Hz = 40

    frequency_range_ratio = max_frequency_Hz / min_frequency_Hz

    frequency_steps = 10

    amplitude = 1

    signals = []
    
    for frequency_step in range(frequency_steps):

        signal_frequency_Hz = min_frequency_Hz * frequency_range_ratio ** (
            frequency_step / frequency_steps)

        signal = amplitude * np.sin(
            tau * signal_frequency_Hz * time_steps / sample_rate_Hz, 
            dtype=np.float32
        )

        signals.append(signal)
        
    signals = np.stack(signals, 0)

    signals = np.repeat(signals, 100, 0) # Generate multiple copies of the data so I can train for less epochs

    def mock_input_pipeline(signal):
        '''
        Performs dataset processing.

        signal: A tensor containing an audio signal.

        Returns: A 2D tensor of audio features (see STFT).
        '''
        spectrogram_vector = tf.reshape(
            tf.abs(
                tf.signal.stft(
                    signal,
                    frame_length=2048,
                    frame_step=50
                )
            ),
            [-1]  # Flatten the spectrogram
        )

        max = tf.reduce_max(spectrogram_vector)

        return spectrogram_vector / max

    return tf.data.Dataset.from_tensor_slices(signals).map(mock_input_pipeline)

# training_dataset = generate_mock_dataset()

In [None]:
'''
Demonstrates the short-time Fourier Transform.
'''
#@title STFT Demo { display-mode: "both" }
#@markdown Toggle cell execution:
run_cell = False #@param {type:"boolean"}

if run_cell:
    import numpy as np


    tau = np.pi * 2
    time_steps = tf.range(0, 11_000, dtype=tf.float32)
    sample_rate = 8000
    waveform = (
        .8 * tf.sin(tau * 1_000 * time_steps / sample_rate) + 
        .3 * tf.sin(tau * 2_000 * time_steps / sample_rate)
    )
    spectrogram = tf.signal.stft(waveform, 2048, 50).numpy()
    print('Spectrogram shape:', spectrogram.shape)
    IPython.display.display(IPython.display.Audio(waveform, rate=sample_rate))

    fig, axes = plt.subplots(2, figsize=(12, 8))
    timescale = np.arange(waveform.shape[0])
    axes[0].plot(timescale, waveform.numpy())
    axes[0].set_title('Waveform')
    axes[0].set_xlim([0, 1600])

    log_spec = np.log(tf.abs(spectrogram).numpy().T + np.finfo(float).eps)
    height = log_spec.shape[0]
    width = log_spec.shape[1]
    X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
    Y = range(height)
    axes[1].pcolormesh(X, Y, log_spec)
    axes[1].set_title('Spectrogram')
    plt.show()

    # Audio:
    reconstructed_signal = tf.signal.inverse_stft(
        stfts=tf.cast(spectrogram, tf.complex64),
        frame_length=2048,
        frame_step=500,
        window_fn=tf.signal.inverse_stft_window_fn(500),
    )

    IPython.display.display(IPython.display.Audio(reconstructed_signal, rate=sample_rate))

In [None]:
'''
Defines the autoencoder architecture.
''' 
import tensorflow.keras as keras


def build_autoencoder(input_dims, hidden_layer_sizes, latent_dims, batch_size):
    '''
    Creates a variational autoencoder.

    input_dims: An integer containing the dimensions of the model input.
    hidden_layer_sizes: A list containing the number of nodes for each hidden
        layer in the encoder, respectively.
    latent_dims: An integer containing the dimensions of the latent space
        representation.

    Returns: (Encoder, Decoder, Autoencoder)
        Encoder: The encoder model.
        Decoder: The decoder model.
        Autoencoder: The full autoencoder model.
    '''
    # Encoder architecture:
    input_layer = keras.Input(
        (input_dims,),
        name='encoder_input'
    ) 

    previous_layer = input_layer

    for node_count in hidden_layer_sizes:
        previous_layer = keras.layers.Dense(
            node_count,
            activation='relu'
        )(previous_layer)

    mean_layer = keras.layers.Dense(
        latent_dims,
        name='mean'
    )(previous_layer)

    log_variance_layer = keras.layers.Dense(
        latent_dims,
        name='log_variance'
    )(previous_layer)

    def normal_sample(inputs):
        ''' Draws samples from a normal distribution. '''
        mean, log_stddev = inputs

        # Generate a batch of random samples
        std_norm = tf.random.normal(
            shape=(batch_size, latent_dims),
            mean=0,
            stddev=1
        )  # KerasTensor

        sample = mean + tf.exp(log_stddev / 2) * std_norm

        return sample

    sample_layer = keras.layers.Lambda(normal_sample)(
        [mean_layer, log_variance_layer])

    encoder_outputs = [sample_layer, mean_layer, log_variance_layer]

    Encoder = keras.Model(input_layer, encoder_outputs, name='Encoder')

    # Decoder architecture:
    latent_space = keras.Input(
        (latent_dims,),
        name='decoder_input'
    )

    previous_layer = latent_space

    for node_count in reversed(hidden_layer_sizes):
        previous_layer = keras.layers.Dense(node_count, 'relu')(previous_layer)

    decoder_layers = keras.layers.Dense(input_dims, 'sigmoid')(previous_layer)

    Decoder = keras.Model(latent_space, decoder_layers, name='Decoder')


    # Complete autoencoder
    sample, mean, log_variance = Encoder(input_layer)

    #sample = keras.backend.print_tensor(sample, '\nsample:')
    
    reconstruction = Decoder(sample)
    
    Autoencoder = keras.Model(input_layer, reconstruction, name='autoencoder')

    def VAE_loss(inputs, reconstructions, log_variance_layer, mean_layer):
        ''' Custom loss function including a KL divergence term. '''
        '''
        reconstruction_loss = keras.losses.binary_crossentropy(
            inputs, reconstructions) * input_dims

        KL_loss = 1 + log_variance_layer - keras.backend.square(mean_layer) \
            - keras.backend.exp(log_variance_layer)

        KL_loss = keras.backend.sum(KL_loss, axis=-1) * -0.5

        total_loss = keras.backend.mean(reconstruction_loss + KL_loss)

        return total_loss
        '''
        # log_variance_layer = keras.backend.print_tensor(
        #    log_variance_layer, "\nLog variance:")
        
        # mean_layer = keras.backend.print_tensor(mean_layer, "Mean:")

        reconstruction_loss = keras.losses.binary_crossentropy(
            inputs, reconstructions) * input_dims

        # reconstruction_loss = tf.reduce_mean(
        #     keras.backend.square(inputs - reconstructions))
        
        kl_loss = -0.5 * tf.reduce_mean(
            1 + log_variance_layer - tf.square(mean_layer) - 
            tf.exp(log_variance_layer)
        )
 
        return keras.backend.mean(reconstruction_loss + kl_loss)

    Autoencoder.add_loss(
        VAE_loss(input_layer, reconstruction, log_variance, mean))

    return (Encoder, Decoder, Autoencoder)


In [None]:
'''  
Trains the model.
'''
batch_size = 44

input_shape = training_dataset.element_spec.shape[0]

keras.backend.clear_session()

encoder, decoder, autoencoder = build_autoencoder(
    input_shape, [256], 3, batch_size)

autoencoder.load_weights(DATASETS_PATH + 'Models/' + 'VAE_256-3/' + f'VAE_1.h5')

autoencoder.compile(
    optimizer=keras.optimizers.Adam(
        learning_rate=0.02
    ), 
    loss=None
)

training_history = autoencoder.fit(
    training_dataset.batch(batch_size), 
    epochs=40,
    shuffle=True,
)

In [None]:
'''  
Generates and presents a sample from the autoencoder.
'''
WRITE_WAVE_FILE = False

sample = list(np.random.normal(0, 1, 3)) # Random sample

# sample = [-4.8, -12, -23.9]

prediction = decoder.predict([sample])

reconstructed_spectrogram = tf.reshape(prediction, (180, -1))

reconstructed_signal = tf.signal.inverse_stft(
    stfts=tf.cast(reconstructed_spectrogram, tf.complex64),
    frame_length=2048,
    frame_step=50,
    window_fn=tf.signal.inverse_stft_window_fn(50),
)

reconstructed_spectrogram = reconstructed_spectrogram.numpy()


# Present the reconstructed output:
figure, axes = plt.subplots(2, figsize=(12, 8))

axes[0].plot(reconstructed_signal)

log_spec = np.log(reconstructed_spectrogram.T + np.finfo(float).eps)

height, width = log_spec.shape

X = np.linspace(0, np.size(reconstructed_spectrogram), num=width, dtype=int)

Y = range(height)

axes[1].pcolormesh(X, Y, log_spec)

axes[1].set_title('Spectrogram')

plt.show()

IPython.display.display(
    IPython.display.Audio(reconstructed_signal, rate=sample_rate))

# Save the WAV file:
if WRITE_WAVE_FILE:
    OUTPUT_DIRECTORY = DATASETS_PATH + 'Output/'
    
    file_count = len(
        [f for f in os.listdir(OUTPUT_DIRECTORY) if f.endswith('.wav')])
    
    with wave.open(
          OUTPUT_DIRECTORY + f'MockingBot_{file_count}.wav', 'wb') as wav:
        wav.setparams(
            (
                1, # Channel count
                2, # Sample width in bytes
                sample_rate, # Sample rate
                11_000,   # Sample count 
                'NONE',      # Compression type (must be None)
                'not compressed' # Compression name
            )
        )
        wav.writeframes(reconstructed_signal)

In [None]:
'''
Saves the VAE parameters.
'''
from datetime import datetime
import pytz


SAVE_MODEL_WEIGHTS = True

if SAVE_MODEL_WEIGHTS:
  
    timestamp = pytz.utc.localize(datetime.now()).strftime('%m-%d-%y_%H-%M-%S')
    autoencoder.save_weights(DATASETS_PATH + 'Models/' + f'VAE_{timestamp}.h5')

I haven't been able to reproduce results similar to the MNIST project using a VAE yet. What I can try:
- Use same architecture as the MNIST project, including the number of input dimensions
- Create a simpler dataset to see what archictecture is required to get good results on various types of input (how architecture needs or does not need to scale with input length, how many latent dimensions are required, whether a CNN would work better)

I have determined that a VAE isn't going to give me the "creativity" I'm looking for, as it's optimizing for recreating the training examples. Consequently, I'm going to try a GAN instead.