## Mount Google Drive to access datasets and store the model

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Install all the required libraries for sound pre-processing and generating sound from mel spectograms

In [None]:
!pip3 install librosa

In [None]:
!pip install tensorflow-gpu==2.3.1
import tensorflow as tf
import numpy as np
import librosa

In [None]:
!pip install soundfile
import soundfile as sf

In [4]:
import os
import pickle

## Pre-Processing the Audio

- **STEPS** 
  - 1] Load the File
  - 2] Pad Signal (as needed)
  - 3] Extract log spectograms from signal
  - 4] Normalize Spectogram
  - 5] Save normalized spectogram

In [5]:
"""Loads an audio file"""
class Loader:
  def __init__(self, sample_rate, duration, mono):
    self.sample_rate = sample_rate
    self.duration = duration
    self.mono = mono # A mode for processing if false we process as stereo

  def load(self, file_path):
    signal, sampleRate = librosa.load(path=file_path, 
                          sr = self.sample_rate,
                          mono=self.mono,
                          duration = self.duration)
    return signal


In [6]:
"""Apply padding to an array of signal as needed"""
class Padder:
  def __init__(self, mode="constant"):
    # constant --> Zero_Padding 
    self.mode = mode

  # ex: [1,2,3] --> zero_padding --> [0,0,1,2,3] padding=2
  def left_pad(self, array, num_missing_items):
    padded_array = np.pad(array=array,
                         #pad_width = (prepend_toarray_num, append_toarray_num) 
                          pad_width=(num_missing_items, 0),
                          mode=self.mode)
    return padded_array

  
  # ex: [1,2,3] --> zero_padding --> [1,2,3,0,0] padding=2
  def right_pad(self, array, num_missing_items):
    padded_array = np.pad(array=array,
                         #pad_width = (prepend_toarray_num, append_toarray_num) 
                          pad_width=(0, num_missing_items),
                          mode=self.mode)
    return padded_array    

In [7]:
""" Extracts log-spectogram in decibels from a time-series signal"""
class LogSpectrogramExtractor:
  def __init__(self, frame_size, hop_length):
    self.frame_size = frame_size
    self.hop_length = hop_length

  def extract(self, signal):
    # (1 + frame_size / 2, num_frames)--> 2d array
    stft = librosa.stft(y = signal,
                        n_fft = self.frame_size,
                        hop_length = self.hop_length)[:-1] #to get even val from reqd dim

    spectrogram = np.abs(stft)
    log_spectrogram =librosa.amplitude_to_db(spectrogram)
    return log_spectrogram

In [8]:
""" Applies Min-Max normalization to an array """
class MinMaxNormalizer:
  def __init__(self, min_val, max_val):
    self.min = min_val
    self.max = max_val
  
  def normalize(self, array):
    # Squish array between 0 and 1.
    norm_arr = (array - array.min()) / (array.max() - array.min())
    # Squish array between max and min instead of 0 and 1 norm.
    norm_arr = norm_arr * (self.max - self.min) + self.min
    return norm_arr

  def denormalize(self, norm_arr, og_min_val, og_max_val):
    array = (norm_arr - self.min) / (self.max - self.min)
    array = array * (og_max_val - og_min_val) + og_min_val
    return array

  

In [9]:
"""Save features and min_max values"""
class Saver:
  def __init__(self, feature_save_dir, min_max_values_save_dir):
        self.feature_save_dir = feature_save_dir
        self.min_max_values_save_dir = min_max_values_save_dir

  def save_feature(self, feature, file_path):
      save_path = self._generate_save_path(file_path)
      np.save(save_path, feature)
      return save_path

  def save_min_max_values(self, min_max_values):
      save_path = os.path.join(self.min_max_values_save_dir,
                                "min_max_values.pkl")
      self._save(min_max_values, save_path)

  @staticmethod
  def _save(data, save_path):
      with open(save_path, "wb") as f:
          pickle.dump(data, f)

  def _generate_save_path(self, file_path):
      file_name = os.path.split(file_path)[1]
      save_path = os.path.join(self.feature_save_dir, file_name + ".npy")
      return save_path


In [10]:
""" Processes audio files in a directory applying the following steps to the files :
  - 1] Load the File
  - 2] Pad Signal (as needed)
  - 3] Extract log spectograms from signal
  - 4] Normalize Spectogram
  - 5] Save normalized spectogram

  Storing min-max values for all log spectograms (i.e. each audio file)
"""

class PreProcessingPipeLine:
  def __init__(self):
    self._loader = None
    self.padder = None
    self.extractor = None
    self.normaliser = None
    self.saver = None
    self.min_max_values = {}
    self.num_expected_samples = None

  @property
  def loader(self):
    return self._loader

  @loader.setter
  def loader(self, loader):
    self._loader = loader
    self.num_expected_samples = int(loader.sample_rate * loader.duration)


  def process(self, audio_files_dir):
    for root, _, files in os.walk(audio_files_dir):
      for file in files:
        file_path = os.path.join(root, file)
        self._process_file(file_path)
        print(f"Processed file {file_path}")
    self.saver.save_min_max_values(self.min_max_values)

  def _process_file(self, file_path):
    signal = self.loader.load(file_path)
    if self._needs_padding(signal):
      signal = self._apply_padding(signal)
    feature = self.extractor.extract(signal)
    norm_feature =  self.normaliser.normalize(feature)
    save_path = self.saver.save_feature(norm_feature, file_path)
    self._store_min_max_value(save_path, feature.min(), feature.max())
    

  def _needs_padding(self, signal):
    if len(signal) < self.num_expected_samples:
      return True
    return False

  def _apply_padding(self, signal):
    num_missing_samples = self.num_expected_samples - len(signal)
    padded_signal = self.padder.right_pad(signal, num_missing_samples)
    return padded_signal

  def _store_min_max_value(self, save_path, min_val, max_val):
    #dictionary within a dictionary
    self.min_max_values[save_path] = {
        "min":min_val,
        "max":max_val
    }



## Define constants and instantiate all objects

In [11]:
FRAME_SIZE = 512
HOP_LENGTH = 256
DURATION = 0.74 #seconds
SAMPLE_RATE = 22050
MONO = True
SPECTROGRAMS_SAVE_DIR = "/content/drive/MyDrive/Audio_Dataset/spectograms_save_dir/"
MIN_MAX_VALUES_SAVE_DIR = "/content/drive/MyDrive/Audio_Dataset/minmax_vals_save_dir/"
FILES_DIR = "/content/drive/MyDrive/Audio_Dataset/recordings/"

# instantiate all objects
loader = Loader(SAMPLE_RATE, DURATION, MONO)
padder = Padder()
log_spectrogram_extractor = LogSpectrogramExtractor(FRAME_SIZE, HOP_LENGTH)
min_max_normaliser = MinMaxNormalizer(0, 1)
saver = Saver(SPECTROGRAMS_SAVE_DIR, MIN_MAX_VALUES_SAVE_DIR)

In [None]:
preprocessing_pipeline = PreProcessingPipeLine()
preprocessing_pipeline.loader = loader
preprocessing_pipeline.padder = padder
preprocessing_pipeline.extractor = log_spectrogram_extractor
preprocessing_pipeline.normaliser = min_max_normaliser
preprocessing_pipeline.saver = saver

preprocessing_pipeline.process(FILES_DIR)


In [13]:
""" Function to load the Free Sound Digits Dataset into our vae model"""
def load_fsdd(spectograms_path):
   x_train = []
   file_paths = []
   for root, sub_dir, filenames in os.walk(spectograms_path):
     for file_name in filenames:
       filepath = os.path.join(root, file_name)
       spectogram = np.load(file=filepath)
       x_train.append(spectogram)
       file_paths.append(filepath)
   x_train = np.array(x_train) #dim --> (num_bins, num_frames) ---> due to stft
   x_train = x_train[..., np.newaxis] # new dim --> (num_samples, n_bins, n_frames, 1) --> needed to process in CNNs which expect 3 dims
   return x_train, file_paths

## Functions required to build and save the VAE model

In [14]:
from tensorflow.keras import backend
tf.compat.v1.disable_eager_execution() #Eager exectution doesn't work with this VAE, operations can't be calculated before hand here.

class VAE:
  def __init__(
      self,
      input_shape,
      conv_filters,
      conv_kernels,
      conv_strides,
      latent_dim_space):
    # Initial bunch of provided inputs
    self.input_shape = input_shape
    self.conv_filters = conv_filters
    self.conv_kernels = conv_kernels
    self.conv_strides = conv_strides
    self.latent_dim_space = latent_dim_space
    self.recon_loss_weight = 1000000

    # Initial state of encoder, decoder, model
    self.encoder = None
    self.decoder = None
    self.model = None
    
    # Params derived from one's supplied
    self.num_conv_layers = len(self.conv_filters)
    self.prev_shape = None
    
    self._model_input = None

    self._build()

  # Provides the build summary for all components of the VAE
  def summary(self):
    self.encoder.summary()
    self.decoder.summary()
    self.model.summary()

  # Build all the components of the VAE
  def _build(self):
    self._build_encoder()
    self._build_decoder()
    self._build_vae()

  # The preprocesssed data needs to be fitted to the model so the model can learn from it.
  def train(self, x_train, batch_size, num_epochs):
    self.model.fit(x_train,
                   x_train,
                   batch_size = batch_size,
                   epochs = num_epochs,
                   shuffle=True)

  # Build the VAE
  def _build_vae(self):
    model_input = self._model_input
    model_output = self.decoder(self.encoder(model_input))
    self.model = tf.keras.Model(model_input, model_output, name="vae")

  # Build vae's Encoder
  def _build_encoder(self):
    encoderInput = self._add_encoder_input()
    conv_layers = self._add_conv_layers(encoderInput)
    encoderOutput = self._add_bottleneck(conv_layers)
    self._model_input = encoderInput
    self.encoder = tf.keras.Model(encoderInput, encoderOutput, name="encoder")

  # Add input layer to the encoder
  def _add_encoder_input(self):
    return tf.keras.layers.Input(shape= self.input_shape, name="encoder_input")

  # Prepare a final output layer for vae based on latent_dim
  def _add_bottleneck(self, conv_layers):
    x = conv_layers
    """Flatten the data and prepare the final output layer for encoder with Gaussian sampling"""

    self.prev_shape = backend.int_shape(x)[1:] #[batch_size, width, height, Num_channels]
    x = tf.keras.layers.Flatten()(x)

    """Encoding a pt. in latent data_space : z = mu + Summation[epsilon]"""

    # mean
    self.mu = tf.keras.layers.Dense(self.latent_dim_space, name="mu")(x)
    # log variance
    self.log_var = tf.keras.layers.Dense(self.latent_dim_space, name="log_var")(x)

    def sample_pt_from_normal_dist(args):
      mu, log_var = args
      #mean = 0 and stddev = 1--> to get standard normal distribution
      epsilon = tf.keras.backend.random_normal(shape=tf.keras.backend.shape(self.mu), mean=0.0, stddev=1.0)
      sampled_point = mu + tf.keras.backend.exp(log_var / 2) * (epsilon)
      return sampled_point

    x = tf.keras.layers.Lambda(sample_pt_from_normal_dist, name="Encoder_Output")([self.mu, self.log_var])
    return x

  # Create all conv blocks for encoder based on num_conv_layers requested
  def _add_conv_layers(self, encoderInput):
    x = encoderInput
    for layer_idx in range(self.num_conv_layers):
      x = self._add_conv_layer(layer_idx, x)
    return x

    """Adds a conv block to a graph of layers which contain 
  multiple conv2d + ReLU + batch_normalizaton"""
  def _add_conv_layer(self, layer_idx, x):
    conv_layer = tf.keras.layers.Conv2D(
        filters = self.conv_filters[layer_idx],
        kernel_size = self.conv_kernels[layer_idx],
        strides = self.conv_strides[layer_idx],
        padding = "same",
        name=f"Encoder_Conv_Layer-{layer_idx+1}",
    )

    x = conv_layer(x)
    x = tf.keras.layers.ReLU(name=f"Encoder_ReLU_{layer_idx + 1}")(x)
    x = tf.keras.layers.BatchNormalization(name=f"Encoder_BatchNormalization_{layer_idx+1}")(x)

    return x

  
  # Build vae's decoder
  def _build_decoder(self):
    decoderInput = self._add_decoder_input()
    dense_layer = self._add_dense_layer(decoderInput)
    reshape_layer = self._add_reshape_layer(dense_layer)
    conv_transpose_layers = self._add_conv_transpose_layers(reshape_layer)
    decoderOutput = self._add_decoder_output(conv_transpose_layers)
    self.decoder = tf.keras.Model(decoderInput, decoderOutput, name="decoder")

  # Add input layer to the decoder
  def _add_decoder_input(self):
    return tf.keras.layers.Input(shape=self.latent_dim_space, name="decoder_input")

  # Add dense layer to the decoder
  def _add_dense_layer(self, decoderInput):
    num_units = np.prod(self.prev_shape) #prev_shape = [x,y,z] -->prod = x*y*z
    return tf.keras.layers.Dense(units=num_units, name="decoder_dense_layer")(decoderInput)

  # Reshape the layer to match target shape needed by decoder
  def _add_reshape_layer(self, dense_layer):
    reshape_layer = tf.keras.layers.Reshape(target_shape=self.prev_shape)(dense_layer)
    return reshape_layer

  # Add convolution transpose block to re-create image from latent dims.
  def _add_conv_transpose_layers(self, x):
    """Add convolutional transpose blocks"""
    # We need to ignore first conv layer we are going in reverse order of conv_layers used
    # in encoder.
    for layer_idx in reversed(range(1,self.num_conv_layers)):
      x = self._add_conv_transpose_layer(layer_idx, x)
    return x

  # Add specific conv_transpose_layer in a specific position
  def _add_conv_transpose_layer(self, layer_idx, x):
    layer_num = self.num_conv_layers - layer_idx

    conv_trans_layer = tf.keras.layers.Conv2DTranspose(
        filters = self.conv_filters[layer_idx],
        kernel_size = self.conv_kernels[layer_idx],
        strides = self.conv_strides[layer_idx],
        padding = "same",
        name=f"decoder_conv_transpose_layer_{layer_num}"
    )

    x = conv_trans_layer(x)
    x = tf.keras.layers.ReLU(name=f"decoder_ReLU_{layer_num}")(x)
    x = tf.keras.layers.BatchNormalization(name=f"decoder_batchnormalization_{layer_num}")(x)
    return x

  # Add final output layer to the decoder
  def _add_decoder_output(self, x):
    conv_trans_layer = tf.keras.layers.Conv2DTranspose(
        filters = self.conv_filters[0],
        kernel_size = self.conv_kernels[0],
        strides = self.conv_strides[0],
        padding="same",
        name=f"decoder_final_conv_trans_layer_{self.num_conv_layers}",
    )

    x = conv_trans_layer(x)
    outputLayer = tf.keras.layers.Activation("sigmoid", name=f"decoder_output_layer")(x)
    return outputLayer

  # Compile the model
  def compile(self, learning_rate=0.0001):
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    self.model.compile(optimizer = optimizer, 
                       loss = self._calculate_combined_loss, 
                       metrics=[self._calculate_reconstruction_loss,
                                self._calculate_kl_loss])

  # Reconstruction Loss
  def _calculate_reconstruction_loss(self, y_target, y_pred):
    error = y_target - y_pred
    recon_loss = tf.keras.backend.mean(tf.keras.backend.square(error), axis=[1,2,3])
    return recon_loss

  # Kullback-Libler Loss
  def _calculate_kl_loss(self, y_target, y_pred):
    # kl_loss = 1/2 * (Summation[ 1 + log(variance) - mean^2 - variance^2]), - -->sign used because we are calculating loss
    kl_loss = - 0.5 * tf.keras.backend.sum(1 + self.log_var - tf.keras.backend.square(self.mu) - 
                                           tf.keras.backend.exp(self.log_var), 
                                           axis=1)
    return kl_loss

  def _calculate_combined_loss(self, y_target, y_pred):
    recon_loss = self._calculate_reconstruction_loss(y_target, y_pred)
    kl_loss = self._calculate_kl_loss(y_target, y_pred)
    combined_loss = self.recon_loss_weight * recon_loss + kl_loss

    return combined_loss



  # Save the model
  def save(self, save_folder="."):
    self._create_save_dir(save_folder)
    self._save_params(save_folder)
    self._save_weights(save_folder)
  
  def _create_save_dir(self, sfolder):
    if not os.path.exists(sfolder):
      os.makedirs(sfolder)

  # Params to be saved
  def _save_params(self, sfolder):
    params = [self.input_shape,    # Initial input model will get
              self.conv_filters,  # List containing num of filters for each layer
              self.conv_kernels, # List containing num of kernels for each layer
              self.conv_strides,  # List containing strides for each layer
              self.latent_dim_space,
              ]
    save_path = os.path.join(sfolder, "params.pkl")
    with open(save_path, "wb")  as f:
      pickle.dump(params, f)

  # Weights to be saved
  def _save_weights(self, sfolder):
     save_path = os.path.join(sfolder, "weights.h5")
     self.model.save_weights(save_path)

  def load(cls, sfolder="."):
    param_path = os.path.join(sfolder, "params.pkl")
    with open(param_path, "rb") as f:
      params = pickle.load(f)
    vae = VAE(*params)
    weights_path = os.path.join(sfolder, "weights.h5")
    vae.load_weights(weights_path)
    return vae

  def load_weights(self, weights_path):
    self.model.load_weights(weights_path)

  def reconstruct(self, images):
      latent_representations = self.encoder.predict(images)
      reconstructed_images = self.decoder.predict(latent_representations)
      return reconstructed_images, latent_representations

In [15]:
training_set, _= load_fsdd("/content/drive/MyDrive/Audio_Dataset/spectograms_save_dir")

## Prepare the VAE for handling sound.

In [16]:
soundVAE = VAE(
    input_shape=(256, 64, 1),
    conv_filters=(512,256,128,64,32),
    conv_kernels=(3,3,3,3,3),
    conv_strides=(2,2,2,2,(2,1)),
    latent_dim_space=128,
)


In [None]:
LEARNING_RATE = 0.0005
BATCH_SIZE = 64
EPOCHS = 200

soundVAE.summary()
soundVAE.compile(LEARNING_RATE)
soundVAE.train(x_train=training_set, batch_size=BATCH_SIZE, num_epochs=EPOCHS)
soundVAE.save('model')
tf.keras.backend.clear_session()

In [17]:
from contextlib import redirect_stdout

with open('modelsummary.txt', 'w') as f:
    with redirect_stdout(f):
        soundVAE.summary()

## Generate Sound from Mel Spectograms

In [18]:
"""Responsible for generating audio from spectograms"""
class SoundGenerator:
  def __init__(self, vae, hop_length):
    self.vae = vae
    self.hop_length = hop_length
    self._min_max_normalizer =  MinMaxNormalizer(0,1)

  def generator(self, spectograms, min_max_values):
    generated_spectograms, latent_reps = self.vae.reconstruct(spectograms)
    signals = self.convert_spec_to_audio(generated_spectograms, min_max_values)
    return signals, latent_reps

  def convert_spec_to_audio(self, spectrograms, min_max_values):
    signals = []
    for spec, values in zip(spectrograms, min_max_values):
      #Reshape log spectogram
      log_spectogram = spec[:, :, 0] #Copy 1st and 2nd dim drop 3rd one.
      #Apply denormalization
      denorm_log_spec = self._min_max_normalizer.denormalize(
          log_spectogram, values["min"], values["max"]
      )
      #Log spectogram --> Linear Spectogram
      spectogram = librosa.db_to_amplitude(denorm_log_spec) #Decibels --> Amplitude
      #Apply inverse stft (griffin-lim algo)
      audio = librosa.istft(spectogram, hop_length=self.hop_length)
      signals.append(audio)
    return signals



In [21]:
HOP_LENGTH = 256
SAVE_DIR_OG = "/content/drive/MyDrive/Sound_DIR/OG_SOUNDS"
SAVE_DIR_GEN = "/content/drive/MyDrive/Sound_DIR/GEN_SOUNDS"
MIN_MAX_VALUES_PATH = "/content/drive/MyDrive/Audio_Dataset/minmax_vals_save_dir/min_max_values.pkl"

In [28]:
def select_spectrograms(spectrograms,
                        file_paths,
                        min_max_values,
                        num_spectrograms=2):
    sampled_indexes = np.random.choice(range(len(spectrograms)), num_spectrograms)
    sampled_spectrogrmas = spectrograms[sampled_indexes]
    file_paths = [file_paths[index] for index in sampled_indexes]
    sampled_min_max_values = [min_max_values[file_path] for file_path in
                           file_paths]
    print(file_paths)
    print(sampled_min_max_values)
    return sampled_spectrogrmas, sampled_min_max_values



def save_signals(signals, save_dir, sample_rate=22050):
    for i, signal in enumerate(signals):
        save_path = os.path.join(save_dir, str(i) + ".wav")
        sf.write(save_path, signal, sample_rate)


In [None]:
# initialise sound generator
svae = soundVAE.load("/content/drive/MyDrive/Trained_Sound_VAE")
soundGen = SoundGenerator(svae, HOP_LENGTH)

# load spectrograms + min max values
with open(MIN_MAX_VALUES_PATH, "rb") as f:
    min_max_values = pickle.load(f)

specs, file_paths = load_fsdd("/content/drive/MyDrive/Audio_Dataset/spectograms_save_dir")

# sample spectrograms + min max values
sampled_specs, sampled_min_max_values = select_spectrograms(specs,
                                                            file_paths,
                                                            min_max_values,
                                                            5)

# generate audio for sampled spectrograms
signals, _ = soundGen.generator(sampled_specs,
                                      sampled_min_max_values)

# convert spectrogram samples to audio
original_signals = soundGen.convert_spec_to_audio(
    sampled_specs, sampled_min_max_values)

# save audio signals
save_signals(signals, SAVE_DIR_GEN)
save_signals(original_signals, SAVE_DIR_OG)