##Importing libraries

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.utils import audio_dataset_from_directory
import numpy as np
import librosa

#tf.compat.v1.disable_eager_execution()
#import tensorflow_io as tfio

##Downloading and splitting data

In [None]:
#Getting the input data from drive (eins, zwei, drei folders)
!gdown 10P678fWDyAJIRv_HlqsXtS2u68NTFZ7I
!unzip data_cnn.zip

In [None]:

data_dir = "/content/data_cnn"

train_ds, val_ds = tf.keras.utils.audio_dataset_from_directory(
    directory=data_dir,
    batch_size=None,
    validation_split=0.2,
    seed=0,
    subset='both')

Found 526 files belonging to 3 classes.
Using 421 files for training.
Using 105 files for validation.


## Preprocessing data

In [None]:
def preprocess_audio_mfps(audio, label):
    # Convert audio tensor to a compatible format
    audio = tf.cast(audio, tf.float32)  # Cast audio to float32
    audio = audio / 32768.0  # Normalize audio

    # Extract mel-frequency power spectra
    def _extract_mel(audio):
        # Reshape the audio tensor to (batch_size, num_samples) as expected by tf.signal.stft
        audio = tf.reshape(audio, [-1])

        # Compute mel-frequency power spectra
        stfts = tf.signal.stft(audio, frame_length=1024, frame_step=512, fft_length=1024)
        spectrograms = tf.abs(stfts)

        num_spectrogram_bins = stfts.shape[-1]
        lower_edge_hertz, upper_edge_hertz, num_mel_bins = 80.0, 7600.0, 128
        linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
            num_mel_bins, num_spectrogram_bins, 16000, lower_edge_hertz, upper_edge_hertz)

        mel_spectrograms = tf.tensordot(spectrograms, linear_to_mel_weight_matrix, 1)

        return mel_spectrograms

    # Use tf.py_function to call _extract_mel with audio tensor
    mel_spectra = tf.py_function(_extract_mel, [audio], tf.float32)

    return mel_spectra, label

In [None]:
train_processed_mspec = train_ds.map(preprocess_audio_mfps)
val_processed_mspec = val_ds.map(preprocess_audio_mfps)

In [None]:

# Find the maximum sequence length in the training dataset
max_length = max(len(seq) for seq, _ in train_processed_mspec.as_numpy_iterator())

# Function to pad sequences
def pad_sequence(seq, label):
    padded_seq = tf.pad(seq, paddings=[[0, max_length - tf.shape(seq)[0]], [0, 0]])
    return padded_seq, label

# Pad the training dataset
padded_train_ds_mfps = train_processed_mspec.map(pad_sequence)

# Pad the validation dataset
padded_val_ds_mfps = val_processed_mspec.map(pad_sequence)

In [None]:
import numpy as np

def create_pairs_and_labels(padded_dataset):
    pairs = []
    labels = []

    for sequence, label in padded_dataset:
        # Assuming 'sequence' is your padded sequence and 'label' is its corresponding label
        pairs.append(sequence)
        labels.append(label)

    # Convert lists to numpy arrays
    pairs = np.array(pairs)
    labels = np.array(labels)

    return pairs, labels

# Apply the function to your padded datasets
train_data, train_labels = create_pairs_and_labels(padded_train_ds_mfps)
val_data, val_labels = create_pairs_and_labels(padded_val_ds_mfps)


## Creating pairs input for siamese network

In [None]:
# Assuming you have your dataset X and corresponding labels y
# X.shape = (num_samples, input_vector_size)
# y.shape = (num_samples,)

# Function to create pairs of data and labels
def create_pairs(X, y, num_pairs):
    pairs = []
    labels = []
    num_classes = len(np.unique(y))
    class_indices = [np.where(y == i)[0] for i in range(num_classes)]

    for _ in range(num_pairs):
        # Select a random class (label)
        class_idx = np.random.randint(0, num_classes)
        # Select a random sample from the selected class
        idx_1 = np.random.choice(class_indices[class_idx])
        # Ensure that the second sample is from the same class for half of the pairs
        should_be_same_class = np.random.randint(0, 2)
        if should_be_same_class:
            idx_2 = np.random.choice(class_indices[class_idx])
        else:
            # Select a random class different from the first one
            class_idx_2 = (class_idx + np.random.randint(1, num_classes)) % num_classes
            idx_2 = np.random.choice(class_indices[class_idx_2])
        pairs.append([X[idx_1], X[idx_2]])
        # 1 if same class, 0 if different class
        labels.append(1 if should_be_same_class else 0)

    return np.array(pairs), np.array(labels)

# Example usage:
num_pairs = 1500  # Adjust this number based on your dataset size and requirements
train_pairs, train_pairs_labels = create_pairs(train_data, train_labels, num_pairs)
num_pairs_val = 500
val_pairs, val_pairs_labels = create_pairs(val_data, val_labels, num_pairs_val)

In [None]:
train_pairs.shape

(1500, 2, 402, 128)

In [None]:
train_pairs_labels

array([0, 0, 0, ..., 1, 0, 0])

## Creating model architecture

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, BatchNormalization, MaxPooling1D, GlobalAveragePooling1D, Dense, Lambda
from tensorflow.keras import backend as K
# Define Siamese network architecture
def siamese_model(input_shape):
    input = Input(shape=input_shape)
    x = Conv1D(16, kernel_size=3, activation='relu', padding='same')(input)
    x = BatchNormalization()(x)
    x = Conv1D(16, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling1D(pool_size=2, strides=2)(x)

    x = Conv1D(32, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = Conv1D(32, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling1D(pool_size=2, strides=2)(x)

    x = Conv1D(64, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = Conv1D(64, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling1D(pool_size=2, strides=2)(x)

    x = Conv1D(128, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = Conv1D(128, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling1D(pool_size=2, strides=2)(x)

    x = Conv1D(256, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = Conv1D(256, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling1D(pool_size=2, strides=2)(x)

    x = Conv1D(512, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)

    x = Conv1D(1024, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)

    x = GlobalAveragePooling1D()(x)
    return Model(input, x)

# Define cosine similarity function
def cosine_similarity(vectors):
    x, y = vectors
    x = K.l2_normalize(x, axis=-1)
    y = K.l2_normalize(y, axis=-1)
    return K.sum(x * y, axis=-1, keepdims=True)

# Define contrastive loss function
def contrastive_loss(y_true, y_pred):
    margin = 1.0
    return K.mean(y_true * K.square(1 - y_pred) + (1 - y_true) * K.square(K.maximum(y_pred - margin, 0)))

# Create Siamese model
input_shape = (402,128) # Define the shape of your input vectors
base_model = siamese_model(input_shape)

input_a = Input(shape=input_shape)
input_b = Input(shape=input_shape)

processed_a = base_model(input_a)
processed_b = base_model(input_b)

cosine_sim = Lambda(cosine_similarity, output_shape=(1,))([processed_a, processed_b])

siamese_network = Model(inputs=[input_a, input_b], outputs=cosine_sim)

# Compile the Siamese model with contrastive loss
siamese_network.compile(optimizer='adam', loss=contrastive_loss, metrics=['accuracy'])

# Summary of the Siamese model
siamese_network.summary()


Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 402, 128)]           0         []                            
                                                                                                  
 input_3 (InputLayer)        [(None, 402, 128)]           0         []                            
                                                                                                  
 model (Functional)          (None, 1024)                 2377312   ['input_2[0][0]',             
                                                                     'input_3[0][0]']             
                                                                                                  
 lambda (Lambda)             (None, 1)                    0         ['model[0][0]',         

In [None]:

# Convert labels to integers
train_pairs_labels = train_pairs_labels.astype(np.float32)
val_pairs_labels = val_pairs_labels.astype(np.float32)


In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

batch_size=32

# Define callbacks
checkpoint_callback = ModelCheckpoint(filepath='best_model.h5', monitor='val_accuracy', save_best_only=True, verbose=1)
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)

history = siamese_network.fit(
    [train_pairs[:, 0], train_pairs[:, 1]],  # input pairs
    train_pairs_labels,
    epochs=50,
    batch_size=batch_size,
    validation_data=([val_pairs[:, 0], val_pairs[:, 1]], val_pairs_labels),
    callbacks=[checkpoint_callback, early_stopping_callback])

Train on 1500 samples, validate on 500 samples
Epoch 1/50

  updates = self.state_updates



Epoch 1: val_accuracy improved from -inf to 0.51200, saving model to best_model.h5


  saving_api.save_model(


Epoch 2/50
Epoch 2: val_accuracy did not improve from 0.51200
Epoch 3/50
Epoch 3: val_accuracy did not improve from 0.51200
Epoch 4/50
Epoch 4: val_accuracy did not improve from 0.51200
Epoch 5/50
Epoch 5: val_accuracy improved from 0.51200 to 0.51400, saving model to best_model.h5
Epoch 6/50
Epoch 6: val_accuracy did not improve from 0.51400
Restoring model weights from the end of the best epoch: 1.
Epoch 6: early stopping


##More number of pairs

In [None]:


# Assuming you have your dataset X and corresponding labels y
# X.shape = (num_samples, input_vector_size)
# y.shape = (num_samples,)

# Function to create pairs of data and labels
def create_pairs(X, y, num_pairs):
    pairs = []
    labels = []
    num_classes = len(np.unique(y))
    class_indices = [np.where(y == i)[0] for i in range(num_classes)]

    for _ in range(num_pairs):
        # Select a random class (label)
        class_idx = np.random.randint(0, num_classes)
        # Select a random sample from the selected class
        idx_1 = np.random.choice(class_indices[class_idx])
        # Ensure that the second sample is from the same class for half of the pairs
        should_be_same_class = np.random.randint(0, 2)
        if should_be_same_class:
            idx_2 = np.random.choice(class_indices[class_idx])
        else:
            # Select a random class different from the first one
            class_idx_2 = (class_idx + np.random.randint(1, num_classes)) % num_classes
            idx_2 = np.random.choice(class_indices[class_idx_2])
        pairs.append([X[idx_1], X[idx_2]])
        # 1 if same class, 0 if different class
        labels.append(1 if should_be_same_class else 0)

    return np.array(pairs), np.array(labels)

# Example usage:
num_pairs = 4000  # Adjust this number based on your dataset size and requirements
train_pairs, train_pairs_labels = create_pairs(train_data, train_labels, num_pairs)
num_pairs_val = 1000
val_pairs, val_pairs_labels = create_pairs(val_data, val_labels, num_pairs_val)


# Convert labels to integers
train_pairs_labels = train_pairs_labels.astype(np.float32)
val_pairs_labels = val_pairs_labels.astype(np.float32)


In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

batch_size=32

# Define callbacks
checkpoint_callback = ModelCheckpoint(filepath='best_model_5k.h5', monitor='val_accuracy', save_best_only=True, verbose=1)
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)

history = siamese_network.fit(
    [train_pairs[:, 0], train_pairs[:, 1]],  # input pairs
    train_pairs_labels,
    epochs=50,
    batch_size=batch_size,
    validation_data=([val_pairs[:, 0], val_pairs[:, 1]], val_pairs_labels),
    callbacks=[checkpoint_callback, early_stopping_callback])

Epoch 1/50
Epoch 1: val_accuracy improved from -inf to 0.48500, saving model to best_model_5k.h5


  saving_api.save_model(


Epoch 2/50
Epoch 2: val_accuracy did not improve from 0.48500
Epoch 3/50
Epoch 3: val_accuracy did not improve from 0.48500
Epoch 4/50
Epoch 4: val_accuracy did not improve from 0.48500
Epoch 5/50
Epoch 5: val_accuracy improved from 0.48500 to 0.77600, saving model to best_model_5k.h5
Epoch 6/50
Epoch 6: val_accuracy improved from 0.77600 to 0.81300, saving model to best_model_5k.h5
Restoring model weights from the end of the best epoch: 1.
Epoch 6: early stopping


In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

batch_size=32

# Define callbacks
checkpoint_callback = ModelCheckpoint(filepath='best_model.h5', monitor='val_accuracy', save_best_only=True, verbose=1)
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)

history = siamese_network.fit(
    [train_pairs[:, 0], train_pairs[:, 1]],  # input pairs
    train_pairs_labels,
    epochs=50,
    batch_size=batch_size,
    validation_data=([val_pairs[:, 0], val_pairs[:, 1]], val_pairs_labels),
    callbacks=[checkpoint_callback, early_stopping_callback])

Epoch 1/50
Epoch 1: val_accuracy improved from -inf to 0.52700, saving model to best_model.h5
Epoch 2/50


  saving_api.save_model(


Epoch 2: val_accuracy did not improve from 0.52700
Epoch 3/50
Epoch 3: val_accuracy improved from 0.52700 to 0.53300, saving model to best_model.h5
Epoch 4/50
Epoch 4: val_accuracy improved from 0.53300 to 0.76700, saving model to best_model.h5
Epoch 5/50
Epoch 5: val_accuracy did not improve from 0.76700
Epoch 6/50
Epoch 6: val_accuracy did not improve from 0.76700
Epoch 7/50
Epoch 7: val_accuracy did not improve from 0.76700
Restoring model weights from the end of the best epoch: 2.
Epoch 7: early stopping


In [None]:
# run on 02-06-24
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

batch_size=32

# Define callbacks
checkpoint_callback = ModelCheckpoint(filepath='best_model.h5', monitor='val_accuracy', save_best_only=True, verbose=1)
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)

history = siamese_network.fit(
    [train_pairs[:, 0], train_pairs[:, 1]],  # input pairs
    train_pairs_labels,
    epochs=50,
    batch_size=batch_size,
    validation_data=([val_pairs[:, 0], val_pairs[:, 1]], val_pairs_labels),
    callbacks=[checkpoint_callback, early_stopping_callback])

Epoch 1/50
Epoch 1: val_accuracy improved from -inf to 0.49900, saving model to best_model.h5


  saving_api.save_model(


Epoch 2/50
Epoch 2: val_accuracy did not improve from 0.49900
Epoch 3/50
Epoch 3: val_accuracy did not improve from 0.49900
Epoch 4/50
Epoch 4: val_accuracy did not improve from 0.49900
Epoch 5/50
Epoch 5: val_accuracy improved from 0.49900 to 0.67400, saving model to best_model.h5
Epoch 6/50
Epoch 6: val_accuracy improved from 0.67400 to 0.70400, saving model to best_model.h5
Restoring model weights from the end of the best epoch: 1.
Epoch 6: early stopping


In [None]:
import soundfile as sf
import audioread
import numpy as np

# Function to load M4A file
def load_m4a(filename):
    with audioread.audio_open(filename) as f:
        data = np.hstack([np.frombuffer(chunk, dtype='int16') for chunk in f])
        return data, f.samplerate

# Load your M4A file
data, samplerate = load_m4a("/content/Anse.m4a")

# Convert it to WAV using soundfile
sf.write("Anse_wav.wav", data, samplerate, format='WAV', subtype='PCM_16')


In [None]:

def load_audio_file(file_path):
    # Load an audio file as a tensor, assume the file is a WAV file
    audio_binary = tf.io.read_file(file_path)
    audio, sample_rate = tf.audio.decode_wav(audio_binary)
    # Only use the first channel if it's stereo
    audio = audio[:, 0]
    return audio, sample_rate


def preprocess_audio_mfps(audio, sample_rate):
    # Cast audio to float32 and normalize
    audio = tf.cast(audio, tf.float32)
    audio = audio / 32768.0  # Normalize audio

    # Extract mel-frequency power spectra
    def _extract_mel(audio):
        # Compute mel-frequency power spectra
        stfts = tf.signal.stft(audio, frame_length=1024, frame_step=512, fft_length=1024)
        spectrograms = tf.abs(stfts)

        num_spectrogram_bins = stfts.shape[-1]
        lower_edge_hertz, upper_edge_hertz, num_mel_bins = 80.0, 7600.0, 128
        linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
            num_mel_bins, num_spectrogram_bins, sample_rate, lower_edge_hertz, upper_edge_hertz)

        mel_spectrograms = tf.tensordot(spectrograms, linear_to_mel_weight_matrix, 1)
        mel_spectrograms = tf.reshape(mel_spectrograms, [1, -1, 128])  # Reshape for batch dimension if needed
        return mel_spectrograms

    # Use tf.py_function to allow for eager execution of the extraction
    mel_spectra = tf.py_function(_extract_mel, [audio], tf.float32)
    return mel_spectra

def pad_sequence(seq):
    # Pad the sequence to the maximum length found in the training data
    padded_seq = tf.pad(seq, paddings=[[0, 0], [0, max_length - tf.shape(seq)[1]], [0, 0]], constant_values=0)
    return padded_seq


In [None]:

def contrastive_loss(y_true, y_pred, margin=1):
    # Calculate the Euclidean distance between the two outputs
    square_pred = tf.square(y_pred)
    margin_square = tf.square(tf.maximum(margin - y_pred, 0))
    return tf.reduce_mean(y_true * square_pred + (1 - y_true) * margin_square)

In [None]:
# Example of how to use these functions
file_path = '/content/zwei_wav.wav'
audio, sample_rate = load_audio_file(file_path)
processed_audio = preprocess_audio_mfps(audio, sample_rate)
processed_padded = pad_sequence(processed_audio)

file_path_base = '/content/Q1200901.wav'
audio_base, sample_rate_base = load_audio_file(file_path_base)
processed_audio_base = preprocess_audio_mfps(audio_base, sample_rate_base)
processed_padded_base = pad_sequence(processed_audio_base)


# # Load the saved Siamese model
# model_path = 'best_model.h5'
# siamese_model = tf.keras.models.load_model(model_path)
from tensorflow.keras.models import load_model

# Register the custom loss function and load the model
siamese_model = load_model('best_model.h5', custom_objects={'contrastive_loss': contrastive_loss})

# Suppose you have another processed audio tensor, reference_audio, to compare against
# Here you should provide your model with both samples as a pair
output = siamese_model([processed_padded, processed_padded_base])  # Assuming your model takes a list of two inputs

# The output typically could be a similarity score or a classification result
print("Model output:", output.numpy())

Model output: [[0.97178507]]


In [None]:
# Example of how to use these functions
file_path = '/content/Anse_wav.wav'
audio, sample_rate = load_audio_file(file_path)
processed_audio = preprocess_audio_mfps(audio, sample_rate)
processed_padded = pad_sequence(processed_audio)

file_path_base = '/content/Q1200901.wav'
audio_base, sample_rate_base = load_audio_file(file_path_base)
processed_audio_base = preprocess_audio_mfps(audio_base, sample_rate_base)
processed_padded_base = pad_sequence(processed_audio_base)


# # Load the saved Siamese model
# model_path = 'best_model.h5'
# siamese_model = tf.keras.models.load_model(model_path)
from tensorflow.keras.models import load_model

# Register the custom loss function and load the model
siamese_model = load_model('best_model.h5', custom_objects={'contrastive_loss': contrastive_loss})

# Suppose you have another processed audio tensor, reference_audio, to compare against
# Here you should provide your model with both samples as a pair
output = siamese_model([processed_padded, processed_padded_base])  # Assuming your model takes a list of two inputs

# The output typically could be a similarity score or a classification result
print("Model output:", output.numpy())

Model output: [[0.47611064]]


In [None]:
# Example of how to use these functions
file_path = '/content/Jaii_wav.wav'
audio, sample_rate = load_audio_file(file_path)
processed_audio = preprocess_audio_mfps(audio, sample_rate)
processed_padded = pad_sequence(processed_audio)

file_path_base = '/content/Q1200901.wav'
audio_base, sample_rate_base = load_audio_file(file_path_base)
processed_audio_base = preprocess_audio_mfps(audio_base, sample_rate_base)
processed_padded_base = pad_sequence(processed_audio_base)


# # Load the saved Siamese model
# model_path = 'best_model.h5'
# siamese_model = tf.keras.models.load_model(model_path)
from tensorflow.keras.models import load_model

# Register the custom loss function and load the model
siamese_model = load_model('best_model.h5', custom_objects={'contrastive_loss': contrastive_loss})

# Suppose you have another processed audio tensor, reference_audio, to compare against
# Here you should provide your model with both samples as a pair
output = siamese_model([processed_padded, processed_padded_base])  # Assuming your model takes a list of two inputs

# The output typically could be a similarity score or a classification result
print("Model output:", output.numpy())

Model output: [[0.98684275]]


In [None]:
# Example of how to use these functions
file_path = '/content/Jaii_wav.wav'
audio, sample_rate = load_audio_file(file_path)
processed_audio = preprocess_audio_mfps(audio, sample_rate)
processed_padded = pad_sequence(processed_audio)

file_path_base = '/content/Q1200901.wav'
audio_base, sample_rate_base = load_audio_file(file_path_base)
processed_audio_base = preprocess_audio_mfps(audio_base, sample_rate_base)
processed_padded_base = pad_sequence(processed_audio_base)


# # Load the saved Siamese model
# model_path = 'best_model.h5'
# siamese_model = tf.keras.models.load_model(model_path)
from tensorflow.keras.models import load_model

# Register the custom loss function and load the model
siamese_model = load_model('best_model_5k.h5', custom_objects={'contrastive_loss': contrastive_loss})

# Suppose you have another processed audio tensor, reference_audio, to compare against
# Here you should provide your model with both samples as a pair
output = siamese_model([processed_padded, processed_padded_base])  # Assuming your model takes a list of two inputs

# The output typically could be a similarity score or a classification result
print("Model output:", output.numpy())

Model output: [[0.94970876]]
