In [2]:
import openl3
import soundfile as sf
import json
import numpy as np
import os
import time

In [3]:
# # A word name of the input audio
# word = 'mussa'

# # Folder containing multiple audio of the same word
# audio_folder = 'input_audios' + '/' + word
audio_folder = 'input_audios'

# Output path for embeddings json
output_file = "embeddings/embeddings.json"

# # Create a list to store embeddings
# all_embeddings = []

In [None]:
# Preload model
model = openl3.models.load_audio_embedding_model(input_repr="mel256", content_type="music",
                                                 embedding_size=512)

In [4]:
# Load audio file

def extract_audio(audio_path):
    audio = None
    sr = None

    try:
        audio, sr = sf.read(audio_path)
        print(sr)
    except Exception as e:
        print(f"Error opening the audio file: {e}")
    
    return audio, sr

In [5]:
def process_audio(audio, sr):
    if audio is not None and sr is not None:
        # Extract audio features using OpenL3
        embeddings, timestamps = openl3.get_audio_embedding(audio, sr, model)
        # print("embeddings: ", embeddings)
        print("timestamps: ", timestamps)
        return embeddings
    else:
        print("No generated embeddings for the given audio file")
        return None

In [6]:

def calculate_avg_embedding(all_embeddings):

    # # Find the maximum length of embeddings
    # max_length = max(len(entry["embedding"]) for entry in data)

    # # Pad or truncate embeddings to have the same length
    # for entry in data:
    #     while len(entry["embedding"]) < max_length:
    #         entry["embedding"].append([0.0, 0.0, 0.0])  # Pad with zeros

    # # Convert embeddings to a NumPy array
    # all_embeddings = np.array([entry["embedding"] for entry in data])

    # Find the maximum length of embeddings
    max_length = max(embedding.shape[0] for embedding in all_embeddings)

    # Pad or truncate embeddings to have the same length
    for i, embedding in enumerate(all_embeddings):
        if embedding.shape[0] < max_length:
            # Pad with zeros to match the max length
            padding_rows = max_length - embedding.shape[0]
            zeros_padding = np.zeros((padding_rows, embedding.shape[1]))
            all_embeddings[i] = np.vstack((embedding, zeros_padding))
        elif embedding.shape[0] > max_length:
            # Truncate to match the max length
            all_embeddings[i] = embedding[:max_length, :]

    # Convert embeddings to a NumPy array
    all_embeddings = np.array(all_embeddings)
    
    # Calculate the mean of embeddings
    average_embedding = np.mean(all_embeddings, axis=0)

    return average_embedding

# Function to Process audio file on all steps at once
def process_word(word):
    all_embeddings = []
    final_folder = audio_folder + '/' + word
    # List all audio files in the folder
    audio_files = [os.path.join(final_folder, filename) for filename in os.listdir(final_folder) if filename.endswith('-clean.wav')]

    print(audio_files)

    # Iterate through the audio files and process them
    for audio_path in audio_files:
        print('Processing file: ', audio_path)
        audio, sr = extract_audio(audio_path)
        if audio is not None and sr is not None:
            embeddings = process_audio(audio, sr)
            if embeddings is not None:
                # Example: Store embeddings in a list for later use
                all_embeddings.append(embeddings)

    # print(all_embeddings)

    # Calculate the average embedding (if needed)
    if all_embeddings:

        average_embedding = calculate_avg_embedding(all_embeddings)

        existing_data = []

        # Load the existing data from the JSON file
        with open(output_file, 'r') as json_file:
            existing_data = json.load(json_file)

        # Check if the word already exists in the data
        word_exists = False
        for entry in existing_data:
            if entry["word"] == word:
                # Get current time in milliseconds (since epoch)
                current_time_ms = str(int(time.time() * 1000))
                entry["embeddings"][current_time_ms] = average_embedding.tolist()
                word_exists = True
                break
        
        # If the word doesn't exist, add a new entry
        if not word_exists:
            # Get current time in milliseconds (since epoch)
            current_time_ms = str(int(time.time() * 1000))
            new_entry = {
                "word": word,
                "embeddings": {
                    current_time_ms: average_embedding.tolist()
                }
            }
            existing_data.append(new_entry)
        
        # # Store the average embedding in a JSON file
        # new_entry = {
        #     'word': word,
        #     'embedding': average_embedding.tolist()
        # }

        # # Append the new entry to the existing data
        # existing_data.append(new_entry)

        # Serialize the updated data to JSON and save it back to the file
        with open(output_file, 'w') as json_file:
            json.dump(existing_data, json_file, indent=2)

        print(f'Done creating embeddings for word: {word}')

    else: 
        print(f"No embeddings generated for the word {word}")

In [7]:
# process all available words in the input_audios folder

# process_word('mussa')
# process_word('moja')
# process_word('mbili')
# process_word('tatu')
# process_word('nne')
# process_word('tano')
# process_word('sita')
# process_word('saba')
# process_word('nane')
# process_word('tisa')
# process_word('kumi')
process_word('sifuri')


['input_audios/sifuri\\1695602217446-clean.wav', 'input_audios/sifuri\\1695602271368-clean.wav']
Processing file:  input_audios/sifuri\1695602217446-clean.wav
16000
timestamps:  [0.  0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.  1.1 1.2]
Processing file:  input_audios/sifuri\1695602271368-clean.wav
16000
timestamps:  [0.  0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ]
Done creating embeddings for word: sifuri
