In [1]:
import numpy as np 

data = np.load('normalized_stft_spectrograms.npz')
loaded_spectrograms = [data[f'{i}'] for i in range(len(data))]

# shape of a STFT spectrogram = (1025, 2584)
# 1515 spectograms
print(len(loaded_spectrograms))

1515


In [2]:
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os

from tensorflow.keras import Sequential, Model, Input
from tensorflow.keras.layers import (Dense, Flatten, Reshape, Concatenate, Conv2D,
                                     UpSampling2D, BatchNormalization, MaxPooling2D, Conv2DTranspose)
tfd = tfp.distributions
tfb = tfp.bijectors
tfpl = tfp.layers






In [3]:

def get_prior(num_modes, latent_dim):
    """
    This function should create an instance of a MixtureSameFamily distribution
    according to the above specification.
    The function takes the num_modes and latent_dim as arguments, which should
    be used to define the distribution.
    Your function should then return the distribution instance.
    """
    gm = tfp.distributions.MixtureSameFamily(
        # the mixture_distribution should be fixed to a uniform
        # tfd.Categorical distribution, so that  pik=1/K  in the above equation.
        # This argument will therefore not contain any trainable variables
        mixture_distribution=tfp.distributions.Categorical(
            probs=[1.0/num_modes,]*num_modes),

        # The components_distribution should be a tfd.MultivariateNormalDiag
        # distribution batch shape equal to [num_modes] and event shape equal to [latent_dim].
        components_distribution = tfp.distributions.MultivariateNormalDiag(
          # should have trainable loc parameter (initialised with a random normal distribution)
          loc = tf.Variable(tf.random.normal(shape = [num_modes, latent_dim])),

          # and trainable scale_diag parameter (initialised to ones)
          # The scale_diag variable should be enforced to be positive using
          # tfp.util.TransformedVariable and the tfb.Softplus bijection
          scale_diag = tfp.util.TransformedVariable(
                                                tf.Variable(
                                                  tf.ones(shape = [num_modes, latent_dim])),
                                                bijector = tfp.bijectors.Softplus()
                                                )
        )
      )


    return gm


In [4]:
# Run your function to get the prior distribution with 2 components and latent_dim = 50

prior = get_prior(num_modes=2, latent_dim=100)

In [5]:
def get_kl_regularizer(prior_distribution):
    """
    This function should create an instance of the KLDivergenceRegularizer
    according to the above specification.
    The function takes the prior_distribution, which should be used to define
    the distribution.
    Your function should then return the KLDivergenceRegularizer instance.
    """
    reg = tfp.layers.KLDivergenceRegularizer(
        prior_distribution,
        weight = 1.0,
        use_exact_kl = False,
        test_points_fn = lambda q : q.sample(3),
        test_points_reduce_axis = (0,1))

    return reg

In [6]:
kl_regularizer = get_kl_regularizer(prior)

In [7]:

def get_encoder(latent_dim, kl_regularizer):
    """
    This function should build a CNN encoder model according to the above specification.
    The function takes latent_dim and kl_regularizer as arguments, which should be
    used to define the model.
    Your function should return the encoder model.
    """
    input_shape = (1024,2048,1)
    encoder = Sequential([
        Conv2D(filters = 32, kernel_size = 4, activation = 'relu',
               strides = (2,4), padding = 'SAME', input_shape = input_shape),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 4), strides=(2, 4), padding='SAME'), 
        
        Conv2D(filters = 64, kernel_size = 4, activation = 'relu',
               strides = (2,4), padding = 'SAME'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 4), strides=(2, 4), padding='SAME'),  
        
        Conv2D(filters = 128, kernel_size = 4, activation = 'relu',
               strides = (2,4), padding = 'SAME'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 4), strides=(2, 4), padding='SAME'),  

        Conv2D(filters = 256, kernel_size = 4, activation = 'relu',
               strides = (2,4), padding = 'SAME'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 4), strides=(2, 4), padding='SAME'),  

        Flatten(),
        Dense(tfp.layers.MultivariateNormalTriL.params_size(latent_dim)),
        tfp.layers.MultivariateNormalTriL(latent_dim, activity_regularizer = kl_regularizer)
    ])

    return encoder


In [8]:
encoder = get_encoder(latent_dim=100, kl_regularizer=kl_regularizer)





In [9]:
encoder.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 512, 512, 32)      544       
                                                                 
 batch_normalization (Batch  (None, 512, 512, 32)      128       
 Normalization)                                                  
                                                                 
 max_pooling2d (MaxPooling2  (None, 256, 128, 32)      0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 128, 32, 64)       32832     
                                                                 
 batch_normalization_1 (Bat  (None, 128, 32, 64)       256       
 chNormalization)                                                
                                                        

In [10]:
def get_decoder(latent_dim):
    decoder = Sequential([
        Dense(1024, activation='relu', input_shape=(latent_dim,)),
        Reshape((4, 1, 256)),
        Conv2DTranspose(256, kernel_size=3, strides=(2, 4), padding='SAME', activation='relu'),
        BatchNormalization(),
        
        Conv2DTranspose(128, kernel_size=3, strides=(2, 4), padding='SAME', activation='relu'),
        BatchNormalization(),

        Conv2DTranspose(64, kernel_size=3, strides=(4, 4), padding='SAME', activation='relu'),
        BatchNormalization(),

        Conv2DTranspose(32, kernel_size=3, strides=(4, 4), padding='SAME', activation='relu'),
        BatchNormalization(),

        Conv2DTranspose(1, kernel_size=3, strides=(4, 8), padding='SAME', activation='sigmoid'),
        Flatten(),
        tfp.layers.IndependentBernoulli(event_shape = (1024,2048,1))
    ])

    return decoder

In [11]:
decoder = get_decoder(latent_dim=100)

In [12]:
decoder.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_1 (Dense)             (None, 1024)              103424    
                                                                 
 reshape (Reshape)           (None, 4, 1, 256)         0         
                                                                 
 conv2d_transpose (Conv2DTr  (None, 8, 4, 256)         590080    
 anspose)                                                        
                                                                 
 batch_normalization_4 (Bat  (None, 8, 4, 256)         1024      
 chNormalization)                                                
                                                                 
 conv2d_transpose_1 (Conv2D  (None, 16, 16, 128)       295040    
 Transpose)                                                      
                                                      

In [27]:

def reconstruction_loss(batch_of_images, decoding_dist):
    """
    This function should compute and return the average expected reconstruction loss,
    as defined above.
    The function takes batch_of_images (Tensor containing a batch of input images to
    the encoder) and decoding_dist (output distribution of decoder after passing the
    image batch through the encoder and decoder) as arguments.
    The function should return the scalar average expected reconstruction loss.
    """
    return -tf.reduce_sum(decoding_dist.log_prob(batch_of_images), axis = 0)

In [28]:
vae = Model(inputs=encoder.inputs, outputs=decoder(encoder.outputs))
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0005)
vae.compile(optimizer=optimizer, loss=reconstruction_loss)

In [29]:
data = np.load('stft_spectrograms.npz')
loaded_spectrograms_2 = np.array([data[f'{i}'] for i in range(len(data))])
vae.fit(loaded_spectrograms_2,loaded_spectrograms_2,epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.src.callbacks.History at 0x295010b0b20>

In [37]:
def generate_music(prior, decoder, n_samples):
    """
    This function should compute generate new samples of images from the generative model,
    according to the above instructions.
    The function takes the prior distribution, decoder and number of samples as inputs, which
    should be used to generate the images.
    The function should then return the batch of generated images.
    """
    z = prior.sample(n_samples)
    return decoder(z).mean()

n_samples = 5
sm = generate_music(prior, decoder, n_samples)
print(sm[0])

tf.Tensor(
[[[0.6221782 ]
  [0.6223173 ]
  [0.62213784]
  ...
  [0.62228304]
  [0.62228304]
  [0.62228304]]

 [[0.6219318 ]
  [0.6218306 ]
  [0.62221146]
  ...
  [0.62228304]
  [0.62228304]
  [0.62228304]]

 [[0.62212867]
  [0.6220848 ]
  [0.6221737 ]
  ...
  [0.62228304]
  [0.62228304]
  [0.62228304]]

 ...

 [[0.6219224 ]
  [0.621839  ]
  [0.62217665]
  ...
  [0.62228304]
  [0.62228304]
  [0.62228304]]

 [[0.62212795]
  [0.62206656]
  [0.6221472 ]
  ...
  [0.62228304]
  [0.62228304]
  [0.62228304]]

 [[0.62228304]
  [0.62228304]
  [0.62228304]
  ...
  [0.62228304]
  [0.62228304]
  [0.62228304]]], shape=(1024, 2048, 1), dtype=float32)


In [41]:
import numpy as np
import librosa
import soundfile as sf

def stft_to_audio(tensor, output_path, sr=22050):
    # Convert the TensorFlow tensor to a numpy array
    tensor_np = tensor.numpy() if isinstance(tensor, tf.Tensor) else tensor

    # Assume tensor_np is just magnitude. You need to handle phase here.
    # This example assumes a "griffin-lim" phase reconstruction:
    y_reconstructed = librosa.griffinlim(tensor_np[:, :, 0])  # Drop channel dimension and use Griffin-Lim

    # Write the reconstructed audio
    sf.write(output_path, y_reconstructed, sr)

# Example usage:
# Assuming 'sm' is your tensor output from the VAE
stft_to_audio(sm[0], 'new.wav')
