#### Import Necessary Libraries

In [1]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"  # "jax" or "tensorflow" or "torch"

import keras_cv
import keras
import keras.backend as K
import tensorflow as tf
import numpy as np
import pandas as pd
from glob import glob
from tqdm import tqdm

import librosa
import IPython.display as ipd
import librosa.display as lid

import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.pylab as ply
import ipywidgets as widgets
import seaborn as sns

import tensorflow_hub as hub
import tensorflow_io as tfio

from itertools import cycle
import soundfile as sf
# Set interactive backend
%matplotlib inline


cmap = mpl.cm.get_cmap('coolwarm')
sns.set_theme(style="white", palette=None)
color_pal = ply.rcParams["axes.prop_cycle"].by_key()["color"]
color_cycle = cycle(ply.rcParams["axes.prop_cycle"].by_key()["color"])


  cmap = mpl.cm.get_cmap('coolwarm')


#### About YAMNet

YAMNet is a pre-trained neural network that employs the MobileNetV1 depthwise-separable convolution architecture. It can use an audio waveform as input and make independent predictions for each of the 521 audio events from the AudioSet corpus.

Internally, the model extracts "frames" from the audio signal and processes batches of these frames. This version of the model uses frames that are 0.96 second long and extracts one frame every 0.48 seconds .

The model accepts a 1-D float32 Tensor or NumPy array containing a waveform of arbitrary length, represented as single-channel (mono) 16 kHz samples in the range [-1.0, +1.0].

The model returns 3 outputs, including the class scores, embeddings (which you will use for transfer learning), and the log mel spectrogram.



In [2]:
### Load Dataset
DATASET_PATH = 'content/birdclef-2024'
## To handle our settings  and configurations, let's create a class
class Config:    
    #Yamnet Model
    sample_rate = 32000
    preset = 'https://tfhub.dev/google/yamnet/1'
    class_names = sorted(os.listdir(f'{DATASET_PATH}/train_audio/'))
    num_classes = len(class_names)
    class_labels = list(range(num_classes))
    label2name = dict(zip(class_labels, class_names))
    name2label = {v:k for k,v in label2name.items()}
    # Define split ratios
    train_ratio = 0.8  # 80% for training
    val_ratio = 0.1    # 10% for validation
    test_ratio = 0.1   # 10% for testing 
    batch_size = 32
    audio_len = 1024
    epochs = 10
    AUTOTUNE = tf.data.AUTOTUNE

### Load and Explore the dataset

In [3]:
df = pd.read_csv(f'{DATASET_PATH}/train_metadata.csv')
df['filepath'] = DATASET_PATH + '/train_audio/' + df.filename
df['target'] = df.primary_label.map(Config.name2label)
df['filename'] = df.filepath.map(lambda x: x.split('/')[-1])
df['xc_id'] = df.filepath.map(lambda x: x.split('/')[-1].split('.')[0])

## display a few rows of the dataframe
df = df.sample(frac=1, random_state=42)
df.head(5)
for row in df.head(5).iterrows():
    print(row[1].filepath)

content/birdclef-2024/train_audio/blrwar1/XC184748.ogg
content/birdclef-2024/train_audio/whtkin2/XC797017.ogg
content/birdclef-2024/train_audio/hoopoe/XC349675.ogg
content/birdclef-2024/train_audio/grnsan/XC478932.ogg
content/birdclef-2024/train_audio/tibfly3/XC645726.ogg


#### Function to convert .ogg to .wav while maintaining directory structure

In [5]:
## Function to convert ogg to wav
# Function to convert .ogg to .wav while maintaining directory structure

import os
from pydub import AudioSegment
import pandas as pd

# Define paths
dataset_path = "content/birdclef-2024"
train_audio_path = os.path.join(dataset_path, "train_audio")
output_path = os.path.join(dataset_path, "train_wav_audio")

# Create the output directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)

def convert_ogg_to_wav(input_path, output_path):
    # Load the .ogg file
    audio = AudioSegment.from_ogg(input_path)
    # Export as .wav
    audio.export(output_path, format="wav")

# Iterate over each row in the dataset
# for index, row in df.iterrows():
#     ogg_filepath = row["filepath"]

#     # Extract subdirectory name
#     relative_path = os.path.relpath(ogg_filepath, train_audio_path)  # e.g., "blrwar1/XC184748.ogg"
#     subdir = os.path.dirname(relative_path)  # e.g., "blrwar1"
    
#     # Construct output directory
#     output_subdir = os.path.join(output_path, subdir)
#     os.makedirs(output_subdir, exist_ok=True)  # Ensure subdirectory exists
    
#     # Construct the output file path
#     filename = os.path.basename(ogg_filepath).replace(".ogg", ".wav")
#     wav_filepath = os.path.join(output_subdir, filename)
    
#     # Convert .ogg to .wav
#     convert_ogg_to_wav(ogg_filepath, wav_filepath)
#     print(f"Converted: {ogg_filepath} -> {wav_filepath}")
# print("Conversion complete!")

#### Function to load audio files, which will also be used later when working with the training data.

In [6]:
# Load the dataset
df['filepath'] = df.filepath.map(lambda x: x.replace('train_audio', 'train_wav_audio').replace('.ogg', '.wav')) 

#### Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio using librosa.

In [7]:
def load_wav_16k_mono(filename):
    """
    Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio using librosa.
    
    Args:
        filename (str): Path to the WAV file.
    
    Returns:
        numpy.ndarray: Resampled audio data as a 1D numpy array.
    """
    # Load the audio file
    audio, _ = librosa.load(filename, sr=Config.sample_rate, mono=True)
    return audio.astype(np.float32)

In [8]:
### Test the function
df.filepath[0]
wav = load_wav_16k_mono(df.filepath[0])
wav

array([ 1.3737008e-06,  9.2573464e-07, -2.7166680e-06, ...,
        6.2165782e-06, -1.4831312e-06, -3.4761615e-06], dtype=float32)

In [None]:

_ = plt.plot(wav)

# Play the audio file.
ipd.Audio(wav, rate=16000)

#### Load the audio files and retrieve embeddings
Here you'll apply the load_wav_16k_mono and prepare the WAV data for the model.

When extracting embeddings from the WAV data, you get an array of shape (N, 1024) where N is the number of frames that YAMNet found (one for every 0.48 seconds of audio).

Your model will use each frame as one input. Therefore, you need to create a new column that has one frame per row. You also need to expand the labels and the fold column to proper reflect these new rows.

The expanded fold column keeps the original values. You cannot mix frames because, when performing the splits, you might end up having parts of the same audio on different splits, which would make your validation and test steps less effective.

In [11]:
# Load YamNet model
yamnet_model = hub.load(Config.preset)

## Build a decoder parse files into embeddings🚀 

**The build_decoder() function constructs a decoder that can process audio files into embeddings.
It loads, normalizes, and converts the audio into embeddings.
If with_labels=True, it also converts labels into one-hot vectors.
The output is Yamnet Embeddings that can be used as input to CNNs.**

In [12]:
import tensorflow as tf
import numpy as np
import librosa

def build_decoder(with_labels=True, dim=1024):
    """
    Builds a function to decode and preprocess audio files into embeddings.
    
    Parameters:
    - with_labels (bool): Whether to return labels along with embeddings.
    - dim (int): Target audio length (number of samples).
    
    Returns:
    - Function to decode audio files (with or without labels).
    """

    def get_audio(filepath):
        """Loads and decodes an audio file from a given filepath using librosa."""
        def _load_audio(filepath):
            # Load the audio file using librosa
            audio, _ = librosa.load(filepath.numpy().decode('utf-8'), sr=Config.sample_rate, mono=True)
            return audio.astype(np.float32)
        # Use tf.py_function to wrap the librosa call
        audio = tf.py_function(_load_audio, [filepath], tf.float32)
        audio.set_shape([None])  # Set shape to [None] since the length may vary
        return audio

    # def crop_or_pad(audio, target_len, pad_mode="constant"):
    #     """Ensures the audio is of fixed length by either cropping or padding."""
        
    #     audio_len = tf.shape(audio)[0]  # Get current length of audio
    #     diff_len = abs(target_len - audio_len)  # Difference from target length
        
    #     audio = audio[:48000]
    #     zero_padding = tf.zeros([48000] - tf.shape(wav), dtype=tf.float32)
    #     audio = tf.concat([zero_padding, audio],0)

    #     # if audio_len < target_len:
    #     #     # If audio is shorter, pad it randomly on both sides
    #     #     pad1 = tf.random.uniform([], maxval=diff_len, dtype=tf.int32)
    #     #     pad2 = diff_len - pad1
    #     #     audio = tf.pad(audio, paddings=[[pad1, pad2]], mode=pad_mode)

    #     # elif audio_len > target_len:
    #     #     # If audio is longer, randomly crop a section
    #     #     idx = tf.random.uniform([], maxval=diff_len, dtype=tf.int32)
    #     #     audio = audio[idx : (idx + target_len)]
    #     return audio
    #     # return tf.reshape(audio, [target_len])  # Ensure fixed shape
    def crop_or_pad(audio, target_len, pad_mode="constant"):
        """
        Ensures the audio is of fixed length by either cropping or padding.
        
        Args:
            audio (tf.Tensor): Input audio tensor of shape [None].
            target_len (int): Target length of the audio.
            pad_mode (str): Padding mode (e.g., "constant", "reflect").
            
        Returns:
            tf.Tensor: Audio tensor of shape [target_len].
        """
        audio_len = tf.shape(audio)[0]  # Get current length of audio
        
        if audio_len < target_len:
            # If audio is shorter, pad it with zeros (or other padding mode)
            padding_len = target_len - audio_len
            pad1 = padding_len // 2  # Pad half on the left
            pad2 = padding_len - pad1  # Pad the rest on the right
            audio = tf.pad(audio, paddings=[[pad1, pad2]], mode=pad_mode)
        elif audio_len > target_len:
            # If audio is longer, crop the center portion
            start = (audio_len - target_len) // 2
            audio = audio[start : start + target_len]
        else:
            # If audio is already the target length, do nothing
            pass
        
        # Ensure the output has the exact target length
        audio = audio[:target_len]
        return audio

    def get_target(target):
        """Converts a label into a one-hot encoded vector."""
        target = tf.reshape(target, [1])  # Reshape to single element tensor
        target = tf.cast(tf.one_hot(target, Config.num_classes), tf.float32)  # One-hot encoding
        return tf.reshape(target, [Config.num_classes])  # Reshape to match the output format

    def decode(path):
        """Processes an audio file into a spectrogram image."""
        # Load and preprocess the audio
        audio = get_audio(path)
        audio = crop_or_pad(audio, 48000)  # Ensure fixed length
        
        # Convert audio to YamNet embeddings
        scores, embeddings, spectrogram = yamnet_model(audio)
        embeddings = tf.reduce_mean(embeddings, axis=0)
        embeddings.set_shape([dim])
        return embeddings

    def decode_with_labels(path, label):
        """Processes an audio file into a spectrogram and returns it with its label."""
        return decode(path), get_target(label)

    return decode_with_labels if with_labels else decode


In [13]:
seed = 42
def build_dataset(
    paths, 
    labels=None, 
    batch_size=32,
    decode_fn=None, 
    cache=True,
    shuffle=2048
):
    """
    Builds a TensorFlow dataset pipeline for audio processing.

    Args:
        paths (list or tf.Tensor): List of file paths to audio files.
        labels (list or tf.Tensor, optional): Corresponding labels for classification. Defaults to None.
        batch_size (int, optional): Number of samples per batch. Defaults to 32.
        decode_fn (function, optional): Function to decode audio files. Defaults to None.
        cache (bool, optional): Whether to cache the dataset in memory. Defaults to True.
        shuffle (int or bool, optional): Buffer size for shuffling. Set to False to disable shuffling. Defaults to 2048.

    Returns:
        tf.data.Dataset: Preprocessed dataset ready for training.
    """

    # Use default decoder if none is provided
    if decode_fn is None:
        decode_fn = build_decoder(with_labels=(labels is not None), dim=Config.audio_len)


    # Set automatic tuning for dataset performance optimization
    AUTO = tf.data.experimental.AUTOTUNE

    # Create dataset from file paths (with or without labels)
    slices = (paths,) if labels is None else (paths, labels)
    print(f"Labels: {labels}")
    ds = tf.data.Dataset.from_tensor_slices(slices)

    # Apply decoding function to process audio files
    ds = ds.map(decode_fn, num_parallel_calls=AUTO)

    # Cache dataset in memory to speed up subsequent iterations
    if cache:
        ds = ds.cache()

    # Shuffle dataset if required
    if shuffle:
        opt = tf.data.Options()
        ds = ds.shuffle(shuffle, seed=seed)  # Shuffle with seed for reproducibility
        opt.experimental_deterministic = False  # Improve performance by allowing non-deterministic order
        ds = ds.with_options(opt)

    # Batch dataset with a fixed size, ensuring even batch sizes
    ds = ds.batch(batch_size, drop_remainder=True)

    # Prefetch data to improve training performance
    ds = ds.prefetch(AUTO)

    return ds


In [14]:
### Split the dataset to train, validation and test sets
from sklearn.model_selection import train_test_split
train_df, valid_df = train_test_split(df, test_size=0.2)
## spliet the validation set into validation and test sets
valid_df, test_df = train_test_split(valid_df, test_size=0.5)
print(f"Num Train: {len(train_df)} | Num Valid: {len(valid_df)} | Num Test: {len(test_df)}")


Num Train: 19567 | Num Valid: 2446 | Num Test: 2446


In [15]:
# Prepare training dataset
train_paths = train_df.filepath.values  # Extract file paths from training DataFrame
train_labels = train_df.target.values   # Extract corresponding labels


train_ds = build_dataset(
    paths=train_paths, 
    labels=train_labels, 
    batch_size=1,
    shuffle=True,  # Enable shuffling for training dataset
)

# Prepare validation dataset
valid_paths = valid_df.filepath.values  # Extract file paths from validation DataFrame
valid_labels = valid_df.target.values   # Extract corresponding labels

valid_ds = build_dataset(
    paths=valid_paths, 
    labels=valid_labels, 
    batch_size=1,
    shuffle=False,  # No shuffling for validation to ensure consistency
)

# Prepare test dataset
test_paths = test_df.filepath.values  # Extract file paths from test DataFrame
test_labels = test_df.target.values   # Extract corresponding labels

test_ds = build_dataset(
    paths=test_paths, 
    labels=test_labels, 
    batch_size=1,
    shuffle=False,  # No shuffling for test to ensure consistency
)

Labels: [167  45 105 ... 129  70 134]
Labels: [143  62  78 ...   0 106  57]
Labels: [106 132 167 ...  17 107  82]


In [16]:
## Preview the shape of the training dataset
for batch in train_ds.take(1):
	print(batch[0].shape, batch[1].shape)

(1, 1024) (1, 182)


2025-04-02 20:28:27.402749: W tensorflow/core/kernels/data/cache_dataset_ops.cc:914] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.
2025-04-02 20:28:27.428745: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


In [17]:
## Create a model using Keras
import tensorflow as tf
from tensorflow.keras import layers
import matplotlib.pyplot as plt

# Define the model
def create_model(input_shape, num_classes):
    model = tf.keras.Sequential([
        layers.Input(shape=input_shape, name="input_embedding"),
        layers.Dense(512, activation="relu", name="dense_1"),
        layers.Dropout(0.5, name="dropout_1"),
        layers.Dense(256, activation="relu", name="dense_2"),
        layers.Dropout(0.5, name="dropout_2"),
        layers.Dense(num_classes, activation="softmax", name="output")
    ])
    return model

# Define input shape and number of classes
input_shape = (1024,)  # YAMNet embeddings are 1024-dimensional
num_classes = Config.num_classes  # Number of bird species

# Create the model
model = create_model(input_shape, num_classes)

# Compile the model
model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

# Print model summary
model.summary()

# Train the model
history = model.fit(
    train_ds,
    validation_data=valid_ds,
    epochs=10,
    verbose=1
)

# # Evaluate the model on the test dataset
# test_loss, test_accuracy = model.evaluate(test_ds, verbose=1)
# print(f"Test Loss: {test_loss}")
# print(f"Test Accuracy: {test_accuracy}")

# # Save the model
# model.save("bird_classification_model.h5")
# print("Model saved to bird_classification_model.h5")

# # Plot training and validation loss
# plt.plot(history.history["loss"], label="Training Loss")
# plt.plot(history.history["val_loss"], label="Validation Loss")
# plt.xlabel("Epoch")
# plt.ylabel("Loss")
# plt.legend()
# plt.title("Training and Validation Loss")
# plt.show()

# # Plot training and validation accuracy
# plt.plot(history.history["accuracy"], label="Training Accuracy")
# plt.plot(history.history["val_accuracy"], label="Validation Accuracy")
# plt.xlabel("Epoch")
# plt.ylabel("Accuracy")
# plt.legend()
# plt.title("Training and Validation Accuracy")
# plt.show()

Epoch 1/10
[1m19567/19567[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m132s[0m 7ms/step - accuracy: 0.0328 - loss: 4.9274 - val_accuracy: 0.0666 - val_loss: 4.4343
Epoch 2/10
[1m19567/19567[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 2ms/step - accuracy: 0.0467 - loss: 4.6400 - val_accuracy: 0.0446 - val_loss: 4.4930
Epoch 3/10
[1m12760/19567[0m [32m━━━━━━━━━━━━━[0m[37m━━━━━━━[0m [1m13s[0m 2ms/step - accuracy: 0.0455 - loss: 4.6022

KeyboardInterrupt: 

In [None]:
## Fit the model
# Build the model
model = build_model(num_classes=Config.num_classes)
history = model.fit(
    train_ds,
    validation_data=valid_ds,
    epochs=100,
    callbacks=[EarlyStopping(patience=3)],  # Stop early if validation loss doesn't improve
)

In [None]:
# Encode labels
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
label_encoder = LabelEncoder()
labels = df.primary_label
file_paths = df.filepath
encoded_labels = label_encoder.fit_transform(labels)
num_classes = len(label_encoder.classes_)

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(file_paths, encoded_labels, test_size=0.2, random_state=42)

In [None]:
def crop_or_pad(audio, target_len, pad_mode="constant"):
        """
        Ensures the audio is of fixed length by either cropping or padding.
        
        Args:
            audio (tf.Tensor): Input audio tensor of shape [None].
            target_len (int): Target length of the audio.
            pad_mode (str): Padding mode (e.g., "constant", "reflect").
            
        Returns:
            tf.Tensor: Audio tensor of shape [target_len].
        """
        audio_len = tf.shape(audio)[0]  # Get current length of audio
        
        if audio_len < target_len:
            # If audio is shorter, pad it with zeros (or other padding mode)
            padding_len = target_len - audio_len
            pad1 = padding_len // 2  # Pad half on the left
            pad2 = padding_len - pad1  # Pad the rest on the right
            audio = tf.pad(audio, paddings=[[pad1, pad2]], mode=pad_mode)
        elif audio_len > target_len:
            # If audio is longer, crop the center portion
            start = (audio_len - target_len) // 2
            audio = audio[start : start + target_len]
        else:
            # If audio is already the target length, do nothing
            pass
        
        # Ensure the output has the exact target length
        audio = audio[:target_len]
        return audio

In [None]:

# Function to extract embeddings
def extract_embedding(file_path):
    # Load and preprocess audio using librosa
    ## Sr is the sampling rate of the audio file. 
    waveform, sample_rate = librosa.load(file_path, sr=16000, mono=True)
    waveform = tf.convert_to_tensor(waveform, dtype=tf.float32)
    # waveform = crop_or_pad(waveform, 48000)  # Ensure fixed length ## 48000 is the number of seconds in the audio file
    # Extract embeddings
    scores, embeddings, spectrogram = yamnet_model(waveform)
    return embeddings.numpy().mean(axis=0)  # Average embeddings over time

# Extract embeddings for training and testing data
X_train_embeddings = np.array([extract_embedding(file_path) for file_path in X_train])
X_test_embeddings = np.array([extract_embedding(file_path) for file_path in X_test])

In [None]:
from tensorflow.keras import layers, models

# Define the classifier model
audio_embedding_model = models.Sequential([
    layers.Input(shape=(X_train_embeddings.shape[1],)),
    layers.Dense(256, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_classes, activation='softmax')
])

# Compile the model
audio_embedding_model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Train the model
history = audio_embedding_model.fit(X_train_embeddings, y_train,
                    epochs=100,
                    batch_size=32,
                    validation_data=(X_test_embeddings, y_test))

In [None]:
# Evaluate the model
test_loss, test_accuracy = audio_embedding_model.evaluate(X_test_embeddings, y_test)
print(f"Test Accuracy: {test_accuracy:.4f}")

# Predict on new data
# def predict_bird(file_path):
#     embedding = extract_embedding(file_path)
#     prediction = audio_embedding_model.predict(embedding[np.newaxis, ...])
#     predicted_label = label_encoder.inverse_transform([np.argmax(prediction)])
#     return predicted_label[0]

# Example usage
# print(predict_bird('path_to_new_wav_file.wav'))

In [None]:
## Use Logistic Regression
from sklearn.linear_model import LogisticRegression
classifier = LogisticRegression(max_iter=1000)
classifier.fit(X_train_embeddings, y_train)
accuracy = classifier.score(X_test_embeddings, y_test)
print(f"Accuracy: {accuracy:.4f}")

In [None]:
## Try using another embedding extraction method
from panns_inference import AudioTagging
import librosa
import numpy as np

In [None]:
# Load PANNs model
model = AudioTagging(checkpoint_path=None, device='cuda')

In [None]:
# Extract embeddings for your dataset
X_train_embeddings = np.array([extract_embedding(file_path) for file_path in X_train])
X_test_embeddings = np.array([extract_embedding(file_path) for file_path in X_test])

In [None]:
# Train a classifier
from sklearn.svm import SVC
classifier = SVC()
classifier.fit(X_train_embeddings, y_train)

# Evaluate
accuracy = classifier.score(X_test_embeddings, y_test)
print(f"Accuracy: {accuracy:.4f}")