# Define common functions used across different models and datasets

In [5]:
import logging

# Configure the logger
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s'
)


## Helper methods

In [2]:
def calculate_spectrogram_dimensions(sample_length, n_fft, hop_length, sr=16000):
    """
    Calculate the dimensions of the spectrogram without computing it.

    Parameters:
    samplerate (int): Sample rate of the audio file.
    sample_length (int): Length of the audio sample in seconds.
    n_fft (int): Number of FFT points.
    hop_length (int): Number of samples between successive frames.
    
    Returns:
    tuple: Dimensions of the spectrogram (frequency_bins, time_frames)
    """
    # Total number of samples in the audio
    total_samples = sr * sample_length
    
    # Number of frequency bins
    frequency_bins = n_fft // 2 + 1
    
    # Number of time frames
    time_frames = 1 + (total_samples - n_fft) // hop_length
    
    return frequency_bins, time_frames

## Extract specific features

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import librosa

def extract_MFCCs(y, sr, n_fft, hop_length, n_mfcc=128, show_plt=False) -> np.ndarray:    
    # MFCC — Mel-Frequency Cepstral Coefficients
    # This feature is one of the most important method to extract a feature of an audio signal and is used majorly whenever working on audio signals. The mel frequency cepstral coefficients (MFCCs) of a signal are a small set of features (usually about 10–20) which concisely describe the overall shape of a spectral envelope.
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length)
    
    if show_plt:
        plt.figure(figsize=(14, 5))
        librosa.display.specshow(mfccs, sr=sr, x_axis='time', y_axis='mel', fmax=8000)
        plt.title('MFCC')
        plt.colorbar()
        plt.show() 
    
    return mfccs
    
def extract_stft_spectrogram(y, sr, hop_length, show_plt: bool,  n_fft=2048) -> np.ndarray:
    # Short Term Fourier Transform (STFT) converts signal such that we can know the amplitude of given frequency at a given time. 
    stft = librosa.stft(y, n_fft=n_fft, hop_length=hop_length)
    
    # You can think of a spectrogram as a bunch of FFTs stacked on top of each other. It is a way to visually represent a signal’s loudness, or amplitude, as it varies over time at different frequencies. There are some additional details going on behind the scenes when computing the spectrogram. The y-axis is converted to a log scale, and the color dimension is converted to decibels (you can think of this as the log scale of the amplitude). This is because humans can only perceive a very small and concentrated range of frequencies and amplitudes.
    spect = np.abs(stft)
    spect = librosa.amplitude_to_db(spect, ref=np.max) # Convert an amplitude spectrogram to dB-scaled spectrogram.
    
    if show_plt:
        plt.figure(figsize=(14, 5))
        librosa.display.specshow(spect, sr=sr, x_axis='time', y_axis='log')
        plt.title('Spectrogram')
        plt.colorbar(format='%+2.0f dB')
        plt.show() 
        
    return spect

def extract_mel_spectrogram(y, sr, hop_length, show_plt, n_fft=2048, n_mels=128) -> np.ndarray:
    # We are better at detecting differences in lower frequencies than higher frequencies.
    # A mel spectrogram is a spectrogram where the frequencies are converted to the mel scale.
    
    # Where-as the mel-spectrogram has mel filters applied which reduces the number of bands to n_mels (typically 32-128)
    mel_spect = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
    mel_spect = librosa.power_to_db(mel_spect, ref=np.max) # moze ref=np.min
    
    if show_plt:
        plt.figure(figsize=(14, 5))
        librosa.display.specshow(mel_spect, sr=sr, y_axis='mel', fmax=8000, x_axis='time')
        plt.title('Mel Spectrogram')
        plt.colorbar(format='%+2.0f dB')
        plt.show()
        
    return mel_spect

def extract_chroma_stft(y, sr, hop_length=2048, n_fft=2048, n_chroma=128) -> np.ndarray:
    stft = np.abs(librosa.stft(y, n_fft=n_fft, hop_length=hop_length))**2
    chroma_stft = librosa.feature.chroma_stft(S=stft, n_chroma=n_chroma, sr=sr)
    
    return chroma_stft

def extract_spectral_contrast(y, sr, hop_length=2048, n_fft=2048) -> np.ndarray:
    stft = np.abs(librosa.stft(y, n_fft=n_fft, hop_length=hop_length))
    spec_contr = librosa.feature.spectral_contrast(S=stft, sr=sr)
    
    return spec_contr

    
def print_waveform(y):
    plt.figure(figsize=(14, 5))
    plt.plot(y)
    plt.title('Signal')
    plt.xlabel('Time (samples)')
    plt.ylabel('Amplitude')
    plt.show()

## Extract all features

In [1]:
import logging
import numpy as np

def extract_features(samples, feature="Mel", sr=16000, hop_length=2048, n_fft=2048, n_mels=128, n_chroma=128, n_mfcc=128, show_logs=False, show_plt=False) -> np.ndarray:
    """Extracts features from audio samples.

    Args:
        samples: List of audio samples.
        sr: Sample rate (default: 16000 Hz).
        debug: Enable debug logging (default: False).
        show_plt: Whether to show matplotlib plot for feature
    Returns:
        Extracted features as a NumPy array.
    """
    
    features_length = len(samples)
    features = []

    for i, data in enumerate(samples):
        if feature == "Stft":
            spect = extract_stft_spectrogram(y=data, sr=sr, hop_length=hop_length, show_plt=show_plt, n_fft=n_fft)
        elif feature == "Mel":
            spect = extract_mel_spectrogram(y=data, sr=sr, hop_length=hop_length, show_plt=show_plt, n_fft=n_fft, n_mels=n_mels)    
        elif feature == "MFCC":
            spect = extract_MFCCs(y=data, sr=sr, show_plt=show_plt, n_mfcc=n_mfcc, hop_length=hop_length, n_fft=n_fft)          
        elif feature == "ChromaStft":
            spect = extract_chroma_stft(y=data, sr=sr,hop_length=hop_length, n_fft=n_fft, n_chroma=n_chroma)
        elif feature == "SpectralContrast":
            spect = extract_spectral_contrast(y=data, sr=sr, hop_length=hop_length, n_fft=n_fft)    
            
        features.append(spect)

        if show_logs and i % 50 == 0:
            logging.info(f"Extracted {i}/{features_length} features.")

    features = np.array(features)

    if show_logs:
        logging.info(f"Extracted features from {features_length} audio files.")
        logging.info(f"Features shape: {features.shape}")

    return features

## Onehot encode

In [3]:
from sklearn.preprocessing import OneHotEncoder

# Perform one-hot encoding for the labels convert to one-valued arr
def onehot_encode(labels: np.ndarray) -> (np.ndarray, dict):
    print("[INFO] Performing one-hot encoding on multiple nodes...")
    # Version 2 -> n nodes
    encoder = OneHotEncoder()
    labels = np.array(labels).reshape(-1, 1)
    labels_onehot = encoder.fit_transform(labels)
    labels_onehot = labels_onehot.astype(float) # convert to float
    labels_onehot = labels_onehot.toarray()
    
    # Display the mapping between original classes and one-hot encoded columns
    original_classes = encoder.categories_[0]
    mapping = {col_idx: class_label for col_idx, class_label in enumerate(original_classes)}
    print("Mapping between column indices and original classes:")
    print(mapping)
    
    print("Labels encoded using OneHotEncoder(), labels.shape:", labels_onehot.shape)
    
    return labels_onehot, mapping

## Create model

### Base CNN model

In [10]:
from keras.layers import LeakyReLU
from keras.src.layers import BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout

def create_base_cnn_model(feature_width, feature_height, channels, classes):
    # Build the CNN model
    model = Sequential([
        # This layer extracts 64 different 3x3 features
        # Typically you’ll see strides of 2×2 as a replacement to max pooling:
        Conv2D(64, (3, 3), input_shape=(feature_width, feature_height, channels), padding="same", activation=LeakyReLU(alpha=0.01)),
        # BatchNormalization(), # jak to skomentowalem to zniknal overfitting i nawet byl underfitting - moze zostawic jedno ale ogolnie lepiej bym poweidzial, mniej overfiitngu
        Dropout(0.25), # after adding it acc was much lower (7%)
        
        Conv2D(128, (3, 3), strides=(4, 4), padding="same", activation=LeakyReLU(alpha=0.01)),
        # BatchNormalization(),
        Dropout(0.25), # Dropout helps generalize and not overfit. Neurons from the current layer, with probability p, will randomly disconnect from neurons in the next layer so that the network has to rely on the existing connections.
         
        Conv2D(128, (3, 3), padding='same', activation=LeakyReLU(alpha=0.01)),
        # BatchNormalization(),
        Dropout(0.25),
        
        Conv2D(64, (3, 3), strides=(4, 4), padding='same', activation=LeakyReLU(alpha=0.01)),
        # BatchNormalization(),
        Dropout(0.25),
         
        Flatten(),
        
        #So you should add BN after your convolutions and then you should also remove DropOut. It has been studied by many researchers that Dropout is not needed if BN is used and BN performs actually better.
        
        # # Use ReLU to avoid vanishing gradients problem
        # next step -> RELU to LEAKY RELU 
        # The main advantage of using Leaky ReLU over normal ReLU is that it helps to overcome the "dying ReLU" problem. The dying ReLU problem occurs when the neuron stops responding to the input because its output is consistently negative due to the ReLU function. Leaky ReLU solves this problem by allowing a small positive output for negative inputs, which ensures that the neuron remains active even when the input is negative. Additionally, Leaky ReLU can help to improve the performance of deep neural networks by allowing them to learn more complex features and reducing the likelihood of overfitting.
        
        #Now, think about the chain rule in the backward pass. If the derivative of the slope of the ReLU is of 0, absolutely no learning is performed on the layers below the dead ReLU, because 0 will be multiplied to the accumulated gradient for the weight update. Thus, you can have dead neurons. This problem doesn’t happen with LReLU or ELU for example, they will always have a little slope to allow the gradients to flow on.
        

        # Dense(512, activation='relu'),
        Dense(512, activation=LeakyReLU(alpha=0.01)),
        BatchNormalization(),
        Dropout(0.25),
        
        # Dense(256, activation='relu'),
        Dense(256, activation=LeakyReLU(alpha=0.01)),
        BatchNormalization(), # adding this normalization (and removing dropout) resulted in model learning faster (on train_acc, val_acc also grows faster but with delay 3-4 epoch)
        Dropout(0.25),
        
        # In the Ioffe and Szegedy 2015, the authors state that "we would like to ensure that for any parameter values, the network always produces activations with the desired distribution". So the Batch Normalization Layer is actually inserted right after a Conv Layer/Fully Connected Layer, but before feeding into ReLu (or any other kinds of) activation
        
        # So in summary, the order of using batch normalization and dropout is:
        # -> CONV/FC -> BatchNorm -> ReLu(or other activation) -> Dropout -> CONV/FC ->
        
        
        #using a drop outs to avoid stepping to local minimums and labeling all samples to one class
    
        Dense(classes, activation='softmax'), # in general the best result are produced with softmax
        #sigmoid in some cases performs better and prevent stucking in the loss function local minimals
        # Categorical Classification: using softmax - multiple output nodes
        
        #If you have a multi-label classification problem = there is more than one "right answer" = the outputs are NOT mutually exclusive, then use a !!!sigmoid function!!! on each raw output independently. The sigmoid will allow you to have high probability for all of your classes, some of them, or none of them. Example: classifying diseases in a chest x-ray image. The image might contain pneumonia, emphysema, and/or cancer, or none of those findings.
        
        #If you have a multi-class classification problem = there is only one "right answer" = the outputs are mutually exclusive, then use a !!!softmax function!!!. The softmax will enforce that the sum of the probabilities of your output classes are equal to one, so in order to increase the probability of a particular class, your model must correspondingly decrease the probability of at least one of the other classes. Example: classifying images from the MNIST data set of handwritten digits. A single picture of a digit has only one true identity - the picture cannot be a 7 and an 8 at the same time.
    ])
    
    # Tldr; I’ve seen a good rule-of-thumb is about 14-18x times the model size for memory limits, so for a 10GB card, training your model would max out memory at roughly 540M parameters.
    model.summary()
    print(f"[INFO] Using My CNN model.")
    
    return model

## Base CNN ale lepsza

In [7]:
from keras.layers import LeakyReLU
from keras.src.layers import BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout

# Tldr; I’ve seen a good rule-of-thumb is about 14-18x times the model size for memory limits, so for a 10GB card, training your model would max out memory at roughly 540M parameters.
def create_better_base_cnn_model(feature_width, feature_height, channels, classes):
    # Build the CNN model
    model = Sequential([
        # This layer extracts 64 different 3x3 features
        # Typically you’ll see strides of 2×2 as a replacement to max pooling:
        Conv2D(64, (3, 3), input_shape=(feature_width, feature_height, channels), padding="same", activation=LeakyReLU(alpha=0.1), kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        Conv2D(64, (3, 3), strides=(4, 4), padding="same", activation=LeakyReLU(alpha=0.1), kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        Dropout(0.3),
         
        Conv2D(64, (3, 3), padding='same', activation=LeakyReLU(alpha=0.1), kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        Conv2D(64, (3, 3), strides=(4, 4), padding='same', activation=LeakyReLU(alpha=0.1), kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        Dropout(0.4),
        
        Flatten(),
        
        Dense(512, kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        Dropout(0.3), 
        
        Dense(256, kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        Dropout(0.3),
        
        Dense(classes, activation='softmax'),
    ])

    model.summary()
    print(f"[INFO] Using My Better CNN model.")
    
    return model

## Use Resnet50

In [1]:
import tensorflow as tf
from keras.layers import LeakyReLU
from keras.src.layers import BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, GlobalAveragePooling2D

def create_resnet50_model(feature_width, feature_height, classes):
    base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False, input_shape=(feature_width, feature_height, 3)) #pre proces returns 3 channels output
    print(f"[INFO] Using Resnet50 model with {len(base_model.layers)} base layers")

    # Freeze the layers of the base model
    for layer in base_model.layers:
        layer.trainable = False
        
    # Add custom layers on top of ResNet50
    x = base_model.output
    # x = Flatten()(x)
    x = GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dense(1024, activation = 'relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(0.3)(x) # z 0.5, acc wynislo=
    x = tf.keras.layers.Dense(512, activation = 'relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(0.4)(x) # z 0.5, acc wynislo=
    predictions = tf.keras.layers.Dense(classes, activation = 'softmax')(x)

    # Create the model
    model = Model(inputs=base_model.input, outputs=predictions)


    for layer in model.layers[:175]:
        layer.trainable = False
    
    for layer in model.layers[175:]:
        layer.trainable = True
          
    model.summary()
    print(f"[INFO] Using Resnet50 model with {len(base_model.layers)} base layers")
    print(f"[INFO] Resnet50 customized has a {len(model.layers)} layers")
    
    return model

2024-08-11 16:06:19.872083: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-08-11 16:06:19.872124: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-08-11 16:06:19.872248: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-08-11 16:06:19.888688: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Model evaluation

In [9]:
from scipy.interpolate import make_interp_spline
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, cohen_kappa_score, ConfusionMatrixDisplay
import os

def evaluate_model(H, y_pred, y_test, n_of_epochs, show_confusion_matrix=True, interpolate=True, plot_file_name="plot.png"):
    print("[INFO] Evaluating model...")

    print(f'Type and shape of y_test: {type(y_test)} | {y_test.shape}')
    print(f'Type and shape of y_pred: {type(y_pred)} | {y_pred.shape}')
    
    print('\nConfusion matrix:') 
    print(confusion_matrix(y_test, y_pred)) 
    
    conf_matrix = confusion_matrix(y_test, y_pred)
    
    if show_confusion_matrix:
        disp = ConfusionMatrixDisplay(conf_matrix)
        
        fig, ax = plt.subplots(figsize=(15,15))
        plt.title("Confusion Matrix")
        plt.xlabel("Predicted")
        plt.ylabel("True")
        ax.grid(False)
        disp.plot(ax=ax)
  
    print(classification_report(y_test, y_pred, zero_division=1))
    print(f"Cohen's Kappa: {cohen_kappa_score(y_test, y_pred)*100:.1f}%")
    print(f"Accuracy: {accuracy_score(y_test, y_pred)*100:.1f}%")
    
    if H is not None:
        # plots the training loss and accuracy
        epochs_x = np.arange(0, n_of_epochs)
        X_ = np.linspace(epochs_x.min(), epochs_x.max(), 250)
        
        plt.style.use("ggplot")
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 16))
        
        # First subplot
        plt.subplot(2, 1, 1)
        if interpolate:
            ax1.plot(X_, return_interpolated_plot(H.history["loss"][:n_of_epochs], epochs_x, X_), label="train_loss")
            ax1.plot(X_, return_interpolated_plot(H.history["val_loss"][:n_of_epochs], epochs_x, X_), label="val_loss")
        else:
            ax1.plot(epochs_x, H.history["loss"][:n_of_epochs], label="train_loss")
            ax1.plot(epochs_x, H.history["val_loss"][:n_of_epochs], label="val_loss")
        ax1.set_title("Training and Validation Loss on Dataset")
        ax1.set_xlabel("Epoch #")
        ax1.set_ylabel("Loss")
        ax1.legend(loc="lower left")
        
        # Second subplot
        plt.subplot(2, 1, 2)
        if interpolate:
            ax2.plot(X_, return_interpolated_plot(H.history["accuracy"][:n_of_epochs], epochs_x, X_), label="train_acc")
            ax2.plot(X_, return_interpolated_plot(H.history["val_accuracy"][:n_of_epochs], epochs_x, X_), label="val_acc")
        else:
            ax2.plot(epochs_x, H.history["accuracy"][:n_of_epochs], label="train_acc")
            ax2.plot(epochs_x, H.history["val_accuracy"][:n_of_epochs], label="val_acc")
        ax2.set_title("Training and Validation Accuracy on Dataset")
        ax2.set_xlabel("Epoch #")
        ax2.set_ylabel("Accuracy")
        ax2.legend(loc="lower left")
        
        print(f"[INFO] Saving training/validation plot as a {plot_file_name}-{accuracy_score(y_test, y_pred)*100:.2f}acc.png")

        plt.savefig(f"{plot_file_name}-{accuracy_score(y_test, y_pred)*100:.1f}acc.png")
        
        plt.show()
        
    print("[INFO] Finished model evaluation")    
    
def return_interpolated_plot(y, x, X_):
    X_Y_Spline = make_interp_spline(x , y)
    Y_ = X_Y_Spline(X_)
 
    return Y_