In [1]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf 
import tensorflow.keras.backend as K  
from sklearn.model_selection import train_test_split

In [2]:
tf.__version__

'2.7.0'

In [3]:
seed = 123
tf.random.set_seed(seed)
np.random.seed(seed)

In [4]:
DATA_PATH = '../data/spcup_2022_training_part1'
Labeled_dir = '../data/spcup_2022_training_part1/labels.csv'

In [6]:
class SpectDataset:

    def __init__(self):
        self.AUTOTUNE = tf.data.AUTOTUNE

    def decode_audio(self, audio_binary):
        # Decode WAV-encoded audio files to `float32` tensors, normalized
        # to the [-1.0, 1.0] range. Return `float32` audio and a sample rate.
        audio, _ = tf.audio.decode_wav(contents=audio_binary)
        # Since all the data is single channel (mono), drop the `channels`
        # axis from the array.
        return tf.squeeze(audio, axis=-1)

    def get_waveform(self, file_path):
        cpath = self.DATA_PATH + os.sep + file_path
        audio_binary = tf.io.read_file(cpath)
        waveform = self.decode_audio(audio_binary)
        return waveform

    def waveform_mapper(self, ds):
        return ds.map(
                # map_func=lambda x: tf.py_function(func=self.get_waveform, inp=[x], Tout=(tf.float32, tf.int64)),
                # map_func=lambda x,y: (tf.py_function(self.get_waveform, [x], tf.float32), y),
                map_func=lambda x,y: (self.get_waveform(x), y),
                num_parallel_calls=self.AUTOTUNE)

    def get_spectrogram(self, waveform):
        # Zero-padding for an audio waveform with less than 16,000 samples.
        input_len = self.sptr_len
        waveform = waveform[:input_len]
        zero_padding = tf.zeros(
            [input_len] - tf.shape(waveform),
            dtype=tf.float32)
        # Cast the waveform tensors' dtype to float32.
        waveform = tf.cast(waveform, dtype=tf.float32)
        # Concatenate the waveform with `zero_padding`, which ensures all audio
        # clips are of the same length.
        equal_length = tf.concat([waveform, zero_padding], 0)
        # Convert the waveform to a spectrogram via a STFT.
        spectrogram = tf.signal.stft(
            equal_length, frame_length=255, frame_step=128)
        # Obtain the magnitude of the STFT.
        spectrogram = tf.abs(spectrogram)
        # Add a `channels` dimension, so that the spectrogram can be used
        # as image-like input data with convolution layers (which expect
        # shape (`batch_size`, `height`, `width`, `channels`).
        spectrogram = spectrogram[..., tf.newaxis]
        return spectrogram

    def spectrogram_mapper(self, ds):
        return ds.map(
                # map_func=lambda x: tf.py_function(func=self.get_spectrogram, inp=[x], Tout=(tf.float32, tf.int64)),
                # map_func=lambda x,y: (tf.py_function(self.get_spectrogram, [x], tf.float32), y),
                map_func=lambda x,y: (self.get_spectrogram(x), y),
                num_parallel_calls=self.AUTOTUNE)

    def load_dataset(self, path, split_ratio):
        label_df = pd.read_csv(path)
        X, y = label_df['track'].values, label_df['algorithm'].values
        # stratified split dataset into train-validation
        X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=split_ratio)

        X_train = tf.convert_to_tensor(X_train)
        y_train = tf.convert_to_tensor(y_train)
        X_test = tf.convert_to_tensor(X_test)
        y_test = tf.convert_to_tensor(y_test)

        primary_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train))
        prm_val_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test))
        return primary_ds, prm_val_ds
    
    def call(self, data_path, label_path, sptr_len=16000, BUFFER_SIZE=32000, BATCH_SIZE=32, split_raio=0.2, is_cache=True, is_prefetch=True):
        self.sptr_len = sptr_len
        self.DATA_PATH = data_path
        train_ds, val_ds = self.load_dataset(label_path, split_raio)
        train_ds, val_ds = self.waveform_mapper(train_ds), self.waveform_mapper(val_ds)
        train_ds, val_ds = self.spectrogram_mapper(train_ds), self.spectrogram_mapper(val_ds)

        train_dataset = train_ds.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=False)
        val_dataset = val_ds.batch(BATCH_SIZE, drop_remainder=False)

        if is_cache:
            train_dataset, val_dataset = train_dataset.cache(), val_dataset.cache()
        
        if is_prefetch:
            train_dataset, val_dataset = train_dataset.prefetch(self.AUTOTUNE), val_dataset.prefetch(self.AUTOTUNE)
        
        return train_dataset, val_dataset

In [7]:
dataloader = SpectDataset()
train_ds, val_ds = dataloader.call(DATA_PATH, Labeled_dir)

In [8]:
train_ds.element_spec

(TensorSpec(shape=(None, None, 129, 1), dtype=tf.float32, name=None),
 TensorSpec(shape=(None,), dtype=tf.int64, name=None))

test the dataset on sample model

In [9]:
from tensorflow.keras import layers  
from tensorflow.keras import models  

In [11]:
for spectrogram, _ in train_ds.take(1):
  input_shape = spectrogram.shape
print('Input shape:', input_shape)

Input shape: (32, 124, 129, 1)


In [12]:
# Instantiate the `tf.keras.layers.Normalization` layer.
norm_layer = layers.Normalization()
# Fit the state of the layer to the spectrograms
# with `Normalization.adapt`.
norm_layer.adapt(data=train_ds.map(map_func=lambda spec, label: spec))

model = models.Sequential([
    layers.Input(shape=input_shape[1:]),
    # Downsample the input.
    layers.Resizing(32, 32),
    # Normalize.
    # norm_layer,
    layers.Conv2D(32, 3, activation='relu'),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(5),
])

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resizing (Resizing)         (None, 32, 32, 1)         0         
                                                                 
 conv2d (Conv2D)             (None, 30, 30, 32)        320       
                                                                 
 conv2d_1 (Conv2D)           (None, 28, 28, 64)        18496     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 14, 14, 64)       0         
 )                                                               
                                                                 
 dropout (Dropout)           (None, 14, 14, 64)        0         
                                                                 
 flatten (Flatten)           (None, 12544)             0         
                                                        

In [13]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [14]:
EPOCHS = 10
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 00010: early stopping
