In [None]:
import os
import librosa
import numpy as np
import tensorflow as tf

In [None]:
print(tf.__version__)

## Data preparation

In [None]:
audio_data = {}
words = os.listdir("dataset_directory")
for word in words:
    audio_files = os.listdir(f"dataset_directory/{word}")
    audio_data[word] = [librosa.load(f"dataset_directory/{word}/{file}")[0] for file in audio_files]

In [None]:
fixed_length = 16000  # 1 second clips at 16kHz

X, y = [], []
for word, audio_clips in audio_data.items():
    for clip in audio_clips:
        if len(clip) < fixed_length:
            clip = np.pad(clip, (0, fixed_length - len(clip)))
        else:
            clip = clip[:fixed_length]
        
        # Normalize
        clip = clip / np.linalg.norm(clip)
        
        X.append(clip)
        y.append(word)

In [None]:
# Truncate the data
batch_size = 64
data_size = len(X)
truncated_size = (data_size // batch_size) * batch_size
X = X[:truncated_size]
y = y [:truncated_size]

nums_step = data_size/batch_size
print(nums_step)

In [None]:
# Convert labels to integers
label_map = {label: i for i, label in enumerate(np.unique(y))}
y = np.array([label_map[label] for label in y])

In [None]:
# Convert the data to TensorFlow Dataset
full_dataset = tf.data.Dataset.from_tensor_slices((X, y))

# Shuffle the full dataset
full_dataset = full_dataset.shuffle(buffer_size=len(X))

# Calculate the size of train and test datasets
total_size = len(X)
train_size = int(0.8 * total_size)
test_size = total_size - train_size

# Perform the train-test split
train_dataset = full_dataset.take(train_size)
test_dataset = full_dataset.skip(train_size)

# Optionally, you can also batch the datasets
train_dataset = train_dataset.batch(32)
test_dataset = test_dataset.batch(32)

In [None]:
num_words = len(words)
input_shape = (fixed_length, )

## Data training

In [None]:
# Model Definition
class WordDiscriminationModel(tf.keras.Model):
    def __init__(self, num_words):
        super(WordDiscriminationModel, self).__init__()
        
        self.reshape = tf.keras.layers.Reshape((fixed_length, 1), input_shape=input_shape)
        self.conv1 = tf.keras.layers.Conv1D(8, 13, padding='valid', activation='relu')
        self.maxpool1 = tf.keras.layers.MaxPooling1D(3)
        self.conv2 = tf.keras.layers.Conv1D(16, 11, padding='valid', activation='relu')
        self.maxpool2 = tf.keras.layers.MaxPooling1D(3)
        self.flatten = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(64, activation=None)
        self.dense2 = tf.keras.layers.Dense(num_words, activation='softmax')
        
    def call(self, x):
   
        x = self.reshape(x)
        x = self.conv1(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.maxpool2(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        return x
    

In [None]:
def build_embedding_model(base_model):
    inputs = tf.keras.layers.Input(shape=(fixed_length,))
    x = base_model.reshape(inputs)
    x = base_model.conv1(x)
    x = base_model.maxpool1(x)
    x = base_model.conv2(x)
    x = base_model.maxpool2(x)
    x = base_model.flatten(x)
    outputs = base_model.dense1(x)
    return tf.keras.Model(inputs=inputs, outputs=outputs)

In [None]:
def dwd_loss(y_true, y_pred,labels, embeddings, N_word, M):
    # Compute Centroids
    #print("loss cal")
    centroids = []
    for j in range(N_word):
        emb_j = embeddings[j * M: (j + 1) * M]  # [M, emb_dim]
        centroid_j = tf.reduce_sum(emb_j, axis=0, keepdims=True) / (M - 1)  # [1, emb_dim]
        centroids.append(centroid_j)
    centroids = tf.concat(centroids, axis=0)  # [N_word, emb_dim]

    # Compute Cosine Similarity
    normalized_embeddings = tf.nn.l2_normalize(embeddings, axis=-1)
    normalized_centroids = tf.nn.l2_normalize(centroids, axis=-1)
    similarity_matrix = tf.matmul(normalized_embeddings, normalized_centroids, transpose_b=True)  # [N_word * M, N_word]

    # Softmax loss
    true_similarity = tf.linalg.diag_part(tf.gather(similarity_matrix, labels, axis=1))  # [N_word * M]
    L_sm = -true_similarity + tf.math.log(tf.reduce_sum(tf.exp(similarity_matrix), axis=1))  # [N_word * M]

    # Contrastive Centroid Loss
    S_ii = true_similarity  # [N_word * M]
    S_ik = tf.reduce_max(similarity_matrix - tf.one_hot(labels, N_word) * 1e6, axis=1)  # [N_word * M]
    L_cc = (1 - S_ii) + S_ik

    # DWD loss
    L_dwd = L_sm + L_cc
    # Standard cross-entropy loss
    xentropy_loss = tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred)
    # Combined loss
    final_loss = xentropy_loss + L_dwd
    return final_loss


In [None]:
# Initialize the model
model = WordDiscriminationModel(num_words=num_words)

In [None]:
# Loss Function and Optimizer
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Variables for early stopping
best_val_loss = float('inf')  # Initialize to a large value
no_improvement_epochs = 0  # Count epochs with no improvement in validation loss
increasing_loss_epochs = 0  # Count epochs with an increasing

In [None]:
N_word = num_words  # Number of unique words
M = 3  # Number of samples per word in each batch

In [None]:
# Initialize metrics
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
val_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()

# Custom Training Loop
for epoch in range(500):  # Replace 50 with your desired number of epochs
    # Training
    #print("train")
    for train_x, train_y in train_dataset:
        #print("step")
        with tf.GradientTape() as tape:
            preds = model(train_x)
            # Extract embeddings (output of dense1 layer)
            embeddings = model.dense1(model.flatten(model.maxpool2(model.conv2(model.maxpool1(model.conv1(model.reshape(train_x)))))))
           
            #print(embeddings)
            # Compute custom loss
            # print("preds")
            # print(preds)
            # print("train_y")
            # print(train_y)
            # print("embeddings")
            # print(embeddings)
            # print("centroid_dict")
            # print(centroid_dict)
            loss = dwd_loss(train_y, preds,train_y, embeddings, N_spk, M)

        # Gradient Descent
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        # Update accuracy metric
        train_accuracy.update_state(train_y, preds)
    
    print(f"Training Accuracy after epoch {epoch}: {train_accuracy.result().numpy()}")
    train_accuracy.reset_states()
    #print("Validation")
    # Validation
    val_loss = 0
    val_steps = 0
    
    for test_x, test_y in test_dataset:
        # Get predictions and embeddings
        val_preds = model(test_x)

        val_embeddings = model.dense1(model.flatten(model.maxpool2(model.conv2(model.maxpool1(model.conv1(model.reshape(test_x)))))))
       
        # Compute validation loss using the custom loss function

        batch_val_loss = dwd_loss(test_y, val_preds,test_y, val_embeddings, N_spk, M)
        val_loss += batch_val_loss
        val_steps += 1
        val_accuracy.update_state(test_y, val_preds)

    val_loss /= val_steps  # Average loss over all validation batches
    
    ls_loss = loss.numpy().tolist()
    ls_val_loss = val_loss.numpy().tolist()
    training_loss =  sum(ls_loss) / len(ls_loss)
    validation_loss = sum(ls_val_loss) / len(ls_val_loss)
    print(f"Epoch {epoch+1}: Training loss : {training_loss} , Validation Loss: {validation_loss}")
    print(f"Validation Accuracy after epoch {epoch}: {val_accuracy.result().numpy()}")
    val_accuracy.reset_states()

    # Early stopping logic
    if validation_loss < best_val_loss:
        best_val_loss = validation_loss
        no_improvement_epochs = 0
        increasing_loss_epochs = 0  # Reset counter
    else:
        no_improvement_epochs += 1
        if validation_loss > best_val_loss:
            increasing_loss_epochs += 1
        else:
            increasing_loss_epochs = 0  # Reset counter if loss is same but not increasing
    
    # Check early stopping conditions
    if no_improvement_epochs >= 10 or increasing_loss_epochs >= 3:
        print(f"Early stopping triggered at epoch {epoch}")
        break

In [None]:
# Save the model
model.save("word_discrimination_model",save_format='tf' )

In [None]:
embedding_model = build_embedding_model(model)
embedding_model.save('word_embedding_model', save_format='tf')

## Inference

In [None]:
def cosine_similarity(vector1, vector2):
    # Compute the dot product of the two vectors
    dot_product = np.dot(vector1, vector2)
  
    # Compute the L2 norm for each vector
    norm1 = np.linalg.norm(vector1)
    norm2 = np.linalg.norm(vector2)
  
    # Compute the cosine similarity
    similarity = dot_product / (norm1 * norm2)
  
    return similarity

In [None]:
# Load the model
loaded_wordembedding_model = tf.keras.models.load_model("word_embedding_model")

In [None]:
audio_clip = librosa.load("audio_path1")[0]

if len(audio_clip) < fixed_length:
    audio_clip = np.pad(audio_clip, (0, fixed_length - len(audio_clip)))
else:
    audio_clip = audio_clip[:fixed_length]

# Normalize
audio_clip = audio_clip / np.linalg.norm(audio_clip)

audio_clip = audio_clip.reshape(1, -1)  # Reshape

# Generate embedding

embedding_of_word = loaded_wordembedding_model.predict(audio_clip)


In [None]:
audio_clip2 = librosa.load("audio_path2")[0]

if len(audio_clip2) < fixed_length:
    audio_clip2 = np.pad(audio_clip2, (0, fixed_length - len(audio_clip2)))
else:
    audio_clip2 = audio_clip2[:fixed_length]


# Assuming audio_clip is your input audio data, properly preprocessed
audio_clip2 = audio_clip2 / np.linalg.norm(audio_clip2)  # Normalization
audio_clip2 = audio_clip2.reshape(1, -1)  # Reshape

# Generate embedding
embedding_of_word2 = loaded_wordembedding_model.predict(audio_clip2)



In [None]:
embedding_of_word_f = embedding_of_word.flatten()
embedding_of_wor2_f = embedding_of_word2.flatten()


similarity_score = cosine_similarity(embedding_of_word_f, embedding_of_wor2_f)

print(f"Cosine Similarity Score: {similarity_score}")