# Mass Embedding of Bioacoustic Audio

This notebook facilitates pre-computing embeddings of audio data for subsequent
use with search, classification, and analysis.

# ATTENTION:

There is a new version of this workflow avialable [here](https://github.com/google-research/perch-hoplite/blob/main/perch_hoplite/agile/1_embed_audio_v2.ipynb), in the new [Perch-Hoplite](https://github.com/google-research/perch-hoplite/blob/main/perch_hoplite) respository.

## Configuration and Imports.

In [None]:
#@title Installation. { vertical-output: true }
#@markdown Run this notebook in Google Colab by following [this link](https://colab.research.google.com/github/google-research/perch/blob/main/embed_audio.ipynb).
#@markdown
#@markdown Run this cell to install the project dependencies.
%pip install git+https://github.com/google-research/perch.git


In [None]:
#@title Imports. { vertical-output: true }

from etils import epath
from ml_collections import config_dict
import numpy as np
import tensorflow as tf
import tqdm
from chirp.inference import colab_utils
colab_utils.initialize(use_tf_gpu=True, disable_warnings=True)

from chirp import audio_utils
from chirp.inference import embed_lib
from chirp.inference import tf_examples
from perch_hoplite.zoo import model_configs


In [None]:
#@title Basic Configuration. { vertical-output: true }

#@markdown Define the model: perch or birdnet are most common for birds.
model_choice = 'perch_8'  #@param['perch_8', 'humpback', 'multispecies_whale', 'surfperch', 'birdnet_V2.3']
#@markdown Set the base directory for the project.
working_dir = '/content/test_solo'  #@param

# Set the embedding and labeled data directories.
embeddings_path = epath.Path(working_dir) / 'embeddings'
labeled_data_path = epath.Path(working_dir) / 'labeled'
embeddings_glob = embeddings_path / 'embeddings-*'

# OPTIONAL: Set up separation model.
separation_model_key = 'separator_model_tf'  #@param
separation_model_path = ''  #@param


## Embed Audio

In [None]:
#@title Embedding Configuration. { vertical-output: true }

config = config_dict.ConfigDict()
config.embed_fn_config = config_dict.ConfigDict()
config.embed_fn_config.model_config = config_dict.ConfigDict()

#@markdown IMPORTANT: Select the target audio files.
#@markdown source_file_patterns should contain a list of globs of audio files, like:
#@markdown ['/home/me/*.wav', '/home/me/other/*.flac']
config.source_file_patterns = ['/content/drive/MyDrive/google COLAB/Penguin/Archive/13B3-1_clipped/13B3-1_exhale-i10_10.wav']  #@param
config.output_dir = embeddings_path.as_posix()

preset_info = model_configs.get_preset_model_config(model_choice)
config.embed_fn_config.model_key = preset_info.model_key
config.embed_fn_config.model_config = preset_info.model_config

# Only write embeddings to reduce size.
config.embed_fn_config.write_embeddings = True
config.embed_fn_config.write_logits = True
config.embed_fn_config.write_separated_audio = False
config.embed_fn_config.write_raw_audio = False

#@markdown File sharding automatically splits audio files into one-minute chunks
#@markdown for embedding. This limits both system and GPU memory usage,
#@markdown especially useful when working with long files (>1 hour).
use_file_sharding = True  #@param {type:'boolean'}
if use_file_sharding:
  config.shard_len_s = 60.0

# Number of parent directories to include in the filename.
config.embed_fn_config.file_id_depth = 1

In [None]:
#@title Set up. { vertical-output: true }

# Set up the embedding function, including loading models.
embed_fn = embed_lib.EmbedFn(**config.embed_fn_config)
print('\n\nLoading model(s)...')
embed_fn.setup()

# Create output directory and write the configuration.
output_dir = epath.Path(config.output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
embed_lib.maybe_write_config(config, output_dir)

# Create SourceInfos.
source_infos = embed_lib.create_source_infos(
    config.source_file_patterns,
    num_shards_per_file=config.get('num_shards_per_file', -1),
    shard_len_s=config.get('shard_len_s', -1))
print(f'Found {len(source_infos)} source infos.')

print('\n\nTest-run of model...')
window_size_s = config.embed_fn_config.model_config.window_size_s
sr = config.embed_fn_config.model_config.sample_rate
z = np.zeros([int(sr * window_size_s)], dtype=np.float32)
embed_fn.embedding_model.embed(z)
print('Setup complete!')

In [None]:
#@title Run embedding. { vertical-output: true }

# Uses multiple threads to load audio before embedding.
# This tends to be faster, but can fail if any audio files are corrupt.

embed_fn.min_audio_s = 1.0
record_file = (output_dir / 'embeddings.tfrecord').as_posix()
succ, fail = 0, 0

existing_embedding_ids = embed_lib.get_existing_source_ids(
    output_dir, 'embeddings-*')

new_source_infos = embed_lib.get_new_source_infos(
    source_infos, existing_embedding_ids, config.embed_fn_config.file_id_depth)

print(f'Found {len(existing_embedding_ids)} existing embedding ids. \n'
      f'Processing {len(new_source_infos)} new source infos. ')

try:
  audio_loader = lambda fp, offset: audio_utils.load_audio_window(
      fp, offset, sample_rate=config.embed_fn_config.model_config.sample_rate,
      window_size_s=config.get('shard_len_s', -1.0))
  audio_iterator = audio_utils.multi_load_audio_window(
      filepaths=[s.filepath for s in new_source_infos],
      offsets=[s.shard_num * s.shard_len_s for s in new_source_infos],
      audio_loader=audio_loader,
  )
  with tf_examples.EmbeddingsTFRecordMultiWriter(
      output_dir=output_dir, num_files=config.get('tf_record_shards', 1)) as file_writer:
    for source_info, audio in tqdm.tqdm(
        zip(new_source_infos, audio_iterator), total=len(new_source_infos)):
      if not embed_fn.validate_audio(source_info, audio):
        continue
      file_id = source_info.file_id(config.embed_fn_config.file_id_depth)
      offset_s = source_info.shard_num * source_info.shard_len_s
      example = embed_fn.audio_to_example(file_id, offset_s, audio)
      if example is None:
        fail += 1
        continue
      file_writer.write(example.SerializeToString())
      succ += 1
    file_writer.flush()
finally:
  del(audio_iterator)
print(f'\n\nSuccessfully processed {succ} source_infos, failed {fail} times.')

fns = [fn for fn in output_dir.glob('embeddings-*')]
ds = tf.data.TFRecordDataset(fns)
parser = tf_examples.get_example_parser()
ds = ds.map(parser)
for ex in ds.as_numpy_iterator():
  print(ex['filename'])
  print(ex['embedding'].shape, flush=True)
  break


In [None]:
import librosa
import numpy as np

# PATH
audio_path = '/content/drive/MyDrive/google COLAB/Penguin/Archive/14B19-1_clipped/14B19-1_exhale-i100_153.wav'

# Modify sample rate
audio, sample_rate = librosa.load(audio_path, sr=32000)

# Input the audio to Perch
outputs = embed_fn.embedding_model.embed(audio)
# Inspect
print("\n--- INSPECTING THE OUTPUT ---")

# Inspecting the Embeddings
# how Perch translates a 5s audio into a mathematical format
print(f"Embeddings Shape: {outputs.embeddings.shape}")

# Inspecting the Logits
# Represents the model's attempt to guess what bird it just heard
print(f"Logits Shape:     {outputs.logits['label'].shape}")

# Calculate the final classification by finding the highest logit score
mean_logits = np.mean(outputs.logits['label'], axis=0)
top_prediction_index = np.argmax(mean_logits)

print(f"\nPerch's Top Prediction (Class ID): {top_prediction_index}")

In [None]:
import collections
import librosa
from scipy.special import softmax

def analyze_top_frequency(folder_path, penguin_name):
    files = tf.io.gfile.glob(f"{folder_path}/*.wav")
    all_top_indices = []
    all_confidences = []

    print(f"Scanning {len(files)} files for {penguin_name}...")

    for f in files:
        audio, _ = librosa.load(f, sr=32000)
        out = embed_fn.embedding_model.embed(audio)

        # Get probabilities via Softmax
        raw_logits = np.mean(out.logits['label'], axis=0)
        probs = softmax(raw_logits)

        # Track the #1 guess for this file
        top_idx = np.argmax(probs)
        all_top_indices.append(top_idx)
        all_confidences.append(probs[top_idx])

    # Find the Most Frequent ID
    counter = collections.Counter(all_top_indices)
    most_common_id, frequency = counter.most_common(1)[0]

    # Get the average confidence for that specific ID
    # We only average the confidence scores when the model picked the winning ID
    winning_confidences = [c for i, c in zip(all_top_indices, all_confidences) if i == most_common_id]
    avg_winning_conf = np.mean(winning_confidences) * 100

    print(f"\n--- {penguin_name} MAJORITY ANALYSIS ---")
    print(f"TOP FREQUENT ID:    {most_common_id}")
    print(f"REPETITION RATE:    {frequency}/{len(files)} files ({(frequency/len(files)*100):.2f}%)")
    print(f"AVG CONFIDENCE:     {avg_winning_conf:.2f}%")
    print("--------")

    return most_common_id, avg_winning_conf

# Run for both penguins
id_13b3, conf_13b3 = analyze_top_frequency('/content/drive/MyDrive/google COLAB/Penguin/Archive/13B3-1_clipped', '13B3-1')
id_14b19, conf_14b19 = analyze_top_frequency('/content/drive/MyDrive/google COLAB/Penguin/Archive/14B19-1_clipped', '14B19-1')

In [None]:
from chirp.inference import tf_examples
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [None]:
X = [] # Store input data
y = [] #Store labels

In [None]:
def load_and_extract(folder_path, penguin_label):
    # Retrieve all embedded files
    fns = tf.io.gfile.glob(f"{folder_path}/embeddings-*")

    # Use TFRecordDataset for memory-efficient data loading
    # as loading embeddings into RAM all at once can cause crashes
    ds = tf.data.TFRecordDataset(fns)
    parser = tf_examples.get_example_parser()
    ds = ds.map(parser)

    for ex in ds.as_numpy_iterator():
        # Perch outputs high-dimensional 3D embeddings (Time, Channel, Features)
        emb = ex['embedding']

        # Average all the data across axis 0 (Time) and axis 1 (Channel), but leave axis 2 (the 1,280 features) alone
        mean_emb = np.mean(emb, axis=(0, 1))
        # Store the flattened features (X) and the corresponding ground truth label (y)
        X.append(mean_emb)
        y.append(penguin_label)


# Assigning labels: 0 for 13B3-1 and 1 for 14B19-1.
# Binary classification

# Point these to the output_dir folders
print("Loading Penguin 13B3-1 data...")
load_and_extract('/content/drive/MyDrive/google COLAB/RESULT/PEN_13B3/embeddings', 0)

print("Loading Penguin 14B19-1 data...")
load_and_extract('/content/drive/MyDrive/google COLAB/RESULT/PEN_14B19/embeddings', 1)

X = np.array(X)
y = np.array(y)

# Check if the audio loaded or not
print(f"Total dataset ready: {X.shape[0]} audio windows loaded.")

# Split the data
# Using a 75/25 split to ensure the model is evaluated on 'unseen' data
# Note: X_train == set of 1,280 features the Random Forest uses to study
# y_train: labels (0 or 1) that tell the model which penguin is speaking in X_train
# X_test: 25% data the model haven't seen
# y_test: labels for X_test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

# Train the Random Forest
print("\nTraining Random Forest Classifier...")
# n_estimators=100 == 100 individual decision trees
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)

# Evaluate the Custom Model
# Given 25% test data
rf_accuracy = accuracy_score(y_test, rf_model.predict(X_test))



# --- FINAL COMPARISON---
print("\n--- RESULTS ---")
print(f"Random Forest Accuracy: {rf_accuracy * 100:.2f}%")

In [None]:
from google.colab import drive
drive.mount('/content/drive')