In [None]:
import os
import pickle
import librosa
import numpy as np

arr=[]
lis=[]
class Loader:
    def __init__(self, sample_rate, duration, mono):
        self.sample_rate = sample_rate
        self.duration = duration
        self.mono = mono
    
    def load(self, file_path):
        signal = librosa.load(file_path,
                              sr=self.sample_rate,
                              duration=self.duration,
                              mono=self.mono)[0]
        lis.append(len(signal))
        return signal


class Padder:
    """Padder is responsible to apply padding to an array."""

    def __init__(self, mode="constant"):
        self.mode = mode

    def left_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (num_missing_items, 0),
                              mode=self.mode)
        return padded_array

    def right_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (0, num_missing_items),
                              mode=self.mode)
        return padded_array


class LogSpectrogramExtractor:
    """LogSpectrogramExtractor extracts log spectrograms (in dB) from a
    time-series signal."""

    def __init__(self, frame_size, hop_length):
        self.frame_size = frame_size
        self.hop_length = hop_length

    def extract(self, signal):
        stft = librosa.stft(signal,
                            n_fft=self.frame_size,
                            hop_length=self.hop_length)[:-1]
        spectrogram = np.abs(stft)
        log_spectrogram = librosa.amplitude_to_db(spectrogram)
        arr.append(log_spectrogram)
        print(log_spectrogram.shape)
        return log_spectrogram


class MinMaxNormaliser:
    """MinMaxNormaliser applies min max normalisation to an array."""

    def __init__(self, min_val, max_val):
        self.min = min_val
        self.max = max_val

    def normalise(self, array):
        norm_array = (array - array.min()) / (array.max() - array.min())
        norm_array = norm_array * (self.max - self.min) + self.min
        return norm_array

    def denormalise(self, norm_array, original_min, original_max):
        array = (norm_array - self.min) / (self.max - self.min)
        array = array * (original_max - original_min) + original_min
        return array


class Saver:
    """saver is responsible to save features, and the min max values."""

    def __init__(self, feature_save_dir, min_max_values_save_dir):
        self.feature_save_dir = feature_save_dir
        self.min_max_values_save_dir = min_max_values_save_dir

    def save_feature(self, feature, file_path):
        save_path = self._generate_save_path(file_path)
        np.save(save_path, feature)

    def save_min_max_values(self, min_max_values):
        save_path = os.path.join(self.min_max_values_save_dir,
                                 "min_max_values.pkl")
        self._save(min_max_values, save_path)

    @staticmethod
    def _save(data, save_path):
        with open(save_path, "wb") as f:
            pickle.dump(data, f)

    def _generate_save_path(self, file_path):
        file_name = os.path.split(file_path)[1]
        save_path = os.path.join(self.feature_save_dir, file_name + ".npy")
        return save_path


class PreprocessingPipeline:
    """PreprocessingPipeline processes audio files in a directory, applying
    the following steps to each file:
        1- load a file
        2- pad the signal (if necessary)
        3- extracting log spectrogram from signal
        4- normalise spectrogram
        5- save the normalised spectrogram
    Storing the min max values for all the log spectrograms.
    """

    def __init__(self):
        self.padder = None
        self.extractor = None
        self.normaliser = None
        self.saver = None
        self.min_max_values = {}
        self._loader = None
        self._num_expected_samples = None

    @property
    def loader(self):
        return self._loader

    @loader.setter
    def loader(self, loader):
        self._loader = loader
        self._num_expected_samples = int(loader.sample_rate * loader.duration)

    def process(self, audio_files_dir):
        for root, _, files in os.walk(audio_files_dir):
            for file in files:
                file_path = os.path.join(root, file)
                self._process_file(file_path)
                print(f"Processed file {file_path}")
        self.saver.save_min_max_values(self.min_max_values)

    def _process_file(self, file_path):
        signal = self.loader.load(file_path)
        if self._is_padding_necessary(signal):
            signal = self._apply_padding(signal)
        feature = self.extractor.extract(signal)
        norm_feature = self.normaliser.normalise(feature)
        save_path = self.saver.save_feature(norm_feature, file_path)
        self._store_min_max_value(save_path, feature.min(), feature.max())

    def _is_padding_necessary(self, signal):
        if len(signal) < self._num_expected_samples:
            return True
        return False

    def _apply_padding(self, signal):
        num_missing_samples = self._num_expected_samples - len(signal)
        padded_signal = self.padder.right_pad(signal, num_missing_samples)
        return padded_signal

    def _store_min_max_value(self, save_path, min_val, max_val):
        self.min_max_values[save_path] = {
            "min": min_val,
            "max": max_val
        }

if __name__ == "__main__":
    FRAME_SIZE = 256
    HOP_LENGTH = 512
    DURATION = 3.2 # in seconds
    SAMPLE_RATE = 20000
    MONO = True

    SPECTROGRAMS_SAVE_DIR = "spg/"
    MIN_MAX_VALUES_SAVE_DIR = "nan/"
    FILES_DIR = "wav/"

    # instantiate all objects
    loader = Loader(SAMPLE_RATE, DURATION, MONO)
    padder = Padder()
    log_spectrogram_extractor = LogSpectrogramExtractor(FRAME_SIZE, HOP_LENGTH)
    min_max_normaliser = MinMaxNormaliser(0, 1)
    saver = Saver(SPECTROGRAMS_SAVE_DIR, MIN_MAX_VALUES_SAVE_DIR)

    preprocessing_pipeline = PreprocessingPipeline()
    preprocessing_pipeline.loader = loader
    preprocessing_pipeline.padder = padder
    preprocessing_pipeline.extractor = log_spectrogram_extractor
    preprocessing_pipeline.normaliser = min_max_normaliser
    preprocessing_pipeline.saver = saver

    preprocessing_pipeline.process(FILES_DIR)
    my_list=np.array(arr)
    print(max(lis))
    my_list.shape

In [8]:
arr

[array([[-57.997017 , -57.997017 , -21.955479 , ..., -28.678642 ,
         -28.720999 , -18.967243 ],
        [-57.997017 , -57.997017 , -12.204554 , ..., -14.637059 ,
         -19.233751 , -19.473001 ],
        [-57.997017 , -57.997017 ,  -7.1173267, ...,  -6.234675 ,
         -18.02274  , -22.050638 ],
        ...,
        [-57.997017 , -57.997017 , -57.997017 , ..., -57.997017 ,
         -57.997017 , -51.658867 ],
        [-57.997017 , -57.997017 , -57.997017 , ..., -57.997017 ,
         -57.997017 , -51.695107 ],
        [-57.997017 , -57.997017 , -57.997017 , ..., -57.997017 ,
         -57.997017 , -51.75692  ]], dtype=float32),
 array([[-62.317608 , -62.317608 , -61.98342  , ...,  -3.7948027,
          -2.0781693,  -5.8237915],
        [-62.317608 , -62.317608 , -62.317608 , ...,   4.9976315,
           4.496003 ,  -2.0145311],
        [-62.317608 , -62.317608 , -54.58201  , ...,   6.581832 ,
           1.8842149,   0.8825408],
        ...,
        [-62.317608 , -62.317608 , -62.

In [9]:
arr=np.array(arr)
arr.shape

(452, 128, 126)

In [None]:
from _future_ import print_function, division

from keras.layers import Input
from keras.models import Model
from keras.optimizers import Adam
from optparse import OptionParser
from model import train, build_audio_generator, build_audio_discriminator

import os

#turn off debug information
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

def main():

    parser = OptionParser()

    # Only required for labeling - Defines train or generate mode
    parser.add_option('-m', '--mode', help='train or gen', dest='mode', default = 'label')
    # Only required for labeling - Enter Model id here
    parser.add_option('-u', '--uid', help='enter model id here')

    epochs = 1000

    (options, args) = parser.parse_args()

    training_data_path = 'data/cv-valid-train/*.wav'

    if options.mode == 'train':

        frame_size = 500
        frame_shift = 128

        audio_shape_disc = (frame_size,256)

        audio_shape_gen = (frame_size, 256)

        optimizer = Adam(0.0002, 0.5)

        # Build and compile the discriminator
        audio_discriminator = build_audio_discriminator(audio_shape_disc)
        audio_discriminator.compile(loss='binary_crossentropy', optimizer=optimizer)

        # Build the generator
        audio_generator = build_audio_generator(audio_shape_gen, frame_size)

        # The generator takes noise
        noise = Input(shape=audio_shape_gen)

        audio = audio_generator(noise)

        # For the combined model we will only train the generator
        # audio_discriminator.trainable = False

        # The discriminator takes generated audio as input and determines validity
        audio_valid = audio_discriminator(audio)

        # The combined model  (stacked generator and discriminator) takes
        # noise as input => generates audio => determines validity
        audio_combined = Model(noise, audio_valid)
        audio_combined.compile(loss='binary_crossentropy', optimizer=optimizer)

        train(training_data_path, audio_generator, audio_discriminator, audio_combined, epochs, frame_size, frame_shift)

if _name_ == '_main_':
    main()

In [51]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras.optimizers import Adam
from keras.models import Sequential
from keras.layers import Dense,Flatten,Input,Reshape
from keras.layers import Conv2D, MaxPooling2D

def generator():
    model = tf.keras.Sequential([
        #tf.keras.layers.Reshape((16, 8)),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.UpSampling2D(2),
        tf.keras.layers.Conv2D(16, 3, activation='relu'),
        tf.keras.layers.UpSampling2D(2),
        tf.keras.layers.Conv2D(8, 3, activation='relu'),
        tf.keras.layers.UpSampling2D(2),
        tf.keras.layers.Conv2D(4, 3, activation='relu'),
        tf.keras.layers.UpSampling2D(2),
        tf.keras.layers.Conv2D(2, 3, activation='relu'),
        tf.keras.layers.UpSampling2D(2),
        tf.keras.layers.Conv2D(1, 3, activation='relu'),
        tf.keras.layers.Dense(128, activation='relu', input_shape=(100,)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(100, activation='tanh'),
        tf.keras.layers.Flatten()
    ])
    return model

def discriminator():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(512, activation='relu', input_shape=(100,)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    return model

# Load your audio data into a 3D array
audio_data = arr

# Flatten the 3D array into a 2D array
audio_data = audio_data.reshape(audio_data.shape[0], -1)

# Normalize the data
audio_data = (audio_data - np.mean(audio_data)) / np.std(audio_data)

train_data = audio_data[:int(0.8 * len(audio_data))]
test_data = audio_data[int(0.8 * len(audio_data)):]

# Create the generator and discriminator models
generator = generator()
discriminator = discriminator()

# Compile the discriminator model
discriminator.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
generator.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Freeze the discriminator weights to use as a fixed feature-extractor
discriminator.trainable = False

generator.fit(train_data,None,batch_size=32,epochs=1,verbose=1,validation_data=(test_data,test_data))

# Create the GAN model
gan_input = tf.keras.Input(shape=(100,))
generated_audio = generator(gan_input)
noise = np.random.normal(size=(10, 100))
synthetic_samples = generator.predict(noise)
generator.save("generator.h5")

ValueError: in user code:

    File "c:\Users\Nandhika\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\engine\training.py", line 1249, in train_function  *
        return step_function(self, iterator)
    File "c:\Users\Nandhika\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\engine\training.py", line 1233, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "c:\Users\Nandhika\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\engine\training.py", line 1222, in run_step  **
        outputs = model.train_step(data)
    File "c:\Users\Nandhika\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\engine\training.py", line 1023, in train_step
        y_pred = self(x, training=True)
    File "c:\Users\Nandhika\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "c:\Users\Nandhika\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\engine\input_spec.py", line 250, in assert_input_compatibility
        raise ValueError(

    ValueError: Exception encountered when calling layer 'sequential_65' (type Sequential).
    
    Input 0 of layer "conv2d_66" is incompatible with the layer: expected min_ndim=4, found ndim=2. Full shape received: (None, 16128)
    
    Call arguments received by layer 'sequential_65' (type Sequential):
      • inputs=tf.Tensor(shape=(None, 16128), dtype=float32)
      • training=True
      • mask=None


In [29]:
model = tf.keras.models.load_model('generator.h5')
model.summary()

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_39 (Dense)            (None, 128)               12928     
                                                                 
 batch_normalization_33 (Bat  (None, 128)              512       
 chNormalization)                                                
                                                                 
 dense_40 (Dense)            (None, 256)               33024     
                                                                 
 batch_normalization_34 (Bat  (None, 256)              1024      
 chNormalization)                                                
                                                                 
 dense_41 (Dense)            (None, 512)               131584    
                                                                 
 batch_normalization_35 (Bat  (None, 512)             