ECE4191 - Team E04

## Multi-Category Model


Single-class detection

In [None]:
!pip install -q --upgrade tensorflow==2.17.0 tensorflow-io
!pip install numpy==1.26.4 pandas==2.1.4 --force-reinstall

In [None]:
import os
import glob
import random
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_io as tfio
import matplotlib.pyplot as plt
from IPython import display
from sklearn.model_selection import train_test_split
from google.colab import files
import zipfile

In [None]:
@tf.function
def load_wav_16k_mono(filename):
  """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """
  file_contents = tf.io.read_file(filename)
  wav, sample_rate = tf.audio.decode_wav(file_contents, desired_channels=1)
  wav = tf.squeeze(wav, axis=-1)
  sample_rate = tf.cast(sample_rate, dtype=tf.int64)
  wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
  return wav

def extract_embedding(wav_data, label):
  scores, embeddings, spectrogram = yamnet_model(wav_data)
  num_embeddings = tf.shape(embeddings)[0]
  return (embeddings, tf.repeat(label, num_embeddings))

In [None]:
yamnet_model_handle = 'https://tfhub.dev/google/yamnet/1'
yamnet_model = hub.load(yamnet_model_handle)

In [None]:
import zipfile

!unzip -q /content/augmented_data/augmented_kookaburra.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_bat.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_cockatoo.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_crocodile.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_dingo.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_duck.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_frog.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_koala.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_magpie.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_platypus.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_possum.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_snake.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_tawnyfrogmouth.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_wombat.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_background.zip -d /content/augmented_data

In [None]:
import glob
import pandas as pd
import os

my_classes = ['kookaburra', 'bat', 'cockatoo', 'crocodile', 'dingo', 'duck', 'frog', 'koala', 'magpie', 'platypus', 'possum', 'snake', 'tawnyfrogmouth', 'wombat', 'background']  # Multi-class
base_data_path = '/content/augmented_data/'

filenames = glob.glob(base_data_path + '**/*.wav', recursive=True)  # Recursive to handle subfolders

targets = []
valid_filenames = []
for f in filenames:
    basename = os.path.basename(f).lower()
    matching_c = next((c for c in my_classes if c in basename), None)
    if matching_c:
        valid_filenames.append(f)
        targets.append(my_classes.index(matching_c))

my_pd_data = pd.DataFrame({'filename': valid_filenames, 'target': targets}).astype({'filename': 'object'})
print(len(valid_filenames))  # Debug: Should be combined total from all classes
my_pd_data.head()

In [None]:
# Split filenames first (fast)
from sklearn.model_selection import train_test_split

filenames = my_pd_data['filename'].tolist()
targets = my_pd_data['target'].tolist()

train_files, test_files, train_targets, test_targets = train_test_split(filenames, targets, test_size=0.2, stratify=targets, random_state=42)
train_files, val_files, train_targets, val_targets = train_test_split(train_files, train_targets, test_size=0.125, stratify=train_targets, random_state=42)

# Create subsets and map/extract/cache
train_ds = tf.data.Dataset.from_tensor_slices((train_files, train_targets)).map(lambda f, l: (load_wav_16k_mono(f), l)).map(extract_embedding).unbatch().cache()
val_ds = tf.data.Dataset.from_tensor_slices((val_files, val_targets)).map(lambda f, l: (load_wav_16k_mono(f), l)).map(extract_embedding).unbatch().cache()
test_ds = tf.data.Dataset.from_tensor_slices((test_files, test_targets)).map(lambda f, l: (load_wav_16k_mono(f), l)).map(extract_embedding).unbatch().cache()

# Batch/prefetch
train_ds = train_ds.batch(32).prefetch(tf.data.AUTOTUNE)
val_ds = val_ds.batch(32).prefetch(tf.data.AUTOTUNE)
test_ds = test_ds.batch(32).prefetch(tf.data.AUTOTUNE)

In [None]:
my_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(1024,), dtype=tf.float32, name='input_embedding'),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(len(my_classes))  # Multi-class output
], name='animal_model')

my_model.summary()

my_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                 optimizer="adam",
                 metrics=["accuracy"])

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3, restore_best_weights=True)

history = my_model.fit(train_ds,
                       epochs=20,
                       validation_data=val_ds,
                       callbacks=[callback])

In [None]:
# Loss plot
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
plt.show()

# Accuracy plot
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()
plt.show()

# Test evaluation
test_results = my_model.evaluate(test_ds, return_dict=True)
print(test_results)

In [None]:
# Upload and test new file
uploaded = files.upload()
new_file = list(uploaded.keys())[0]

waveform = load_wav_16k_mono(new_file)
scores, embeddings, spectrogram = yamnet_model(waveform)
result = my_model(embeddings).numpy()
probs = tf.nn.softmax(result.mean(axis=0))  # Softmax for probabilities
inferred_idx = tf.argmax(probs)
inferred_class = my_classes[inferred_idx]
top_prob = probs[inferred_idx]

print(f'The main sound is: {inferred_class} (probability: {top_prob:.2f})')



Saving possum.wav to possum (2).wav
The main sound is: bat (probability: 0.89)


Simultaneous Detection

In [None]:
!pip install -q --upgrade tensorflow==2.17.0 tensorflow-io
!pip install numpy==1.26.4 pandas==2.1.4 --force-reinstall

In [None]:
import os
import glob
import random
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_io as tfio
import matplotlib.pyplot as plt
from IPython import display
from sklearn.model_selection import train_test_split
from google.colab import files
import zipfile

In [None]:
@tf.function
def load_wav_16k_mono(filename):
  """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """
  file_contents = tf.io.read_file(filename)
  wav, sample_rate = tf.audio.decode_wav(file_contents, desired_channels=1)
  wav = tf.squeeze(wav, axis=-1)
  sample_rate = tf.cast(sample_rate, dtype=tf.int64)
  wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
  return wav

def extract_embedding(wav_data, label):
  scores, embeddings, spectrogram = yamnet_model(wav_data)
  num_embeddings = tf.shape(embeddings)[0]
  return (embeddings, tf.repeat(tf.expand_dims(label, axis=0), num_embeddings, axis=0))  # Repeat multi-hot vector

In [None]:
yamnet_model_handle = 'https://tfhub.dev/google/yamnet/1'
yamnet_model = hub.load(yamnet_model_handle)

In [None]:
import zipfile

!unzip -q /content/augmented_data/augmented_kookaburra.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_bat.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_cockatoo.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_crocodile.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_dingo.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_duck.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_frog.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_koala.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_magpie.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_platypus.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_possum.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_snake.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_tawnyfrogmouth.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_wombat.zip -d /content/augmented_data
!unzip -q /content/augmented_data/augmented_background.zip -d /content/augmented_data

In [None]:
import glob
import pandas as pd
import os
import random  # Ensure imported

my_classes = ['kookaburra', 'bat', 'cockatoo', 'crocodile', 'dingo', 'duck', 'frog', 'koala', 'magpie', 'platypus', 'possum', 'snake', 'tawnyfrogmouth', 'wombat', 'background']
base_data_path = '/content/augmented_data/'

# Group files by class (dict for mixing)
class_files = {}
for c in my_classes:
    class_files[c] = glob.glob(os.path.join(base_data_path, f'augmented_{c}', '*.wav'))  # Assuming subfolders like augmented_kookaburra/

print({c: len(files) for c, files in class_files.items()})  # Debug totals

In [None]:
from sklearn.model_selection import train_test_split

# Mixing parameters (tune these)
mix_prob = 0.5  # Probability a sample is mixed
max_mixes = 2   # Max additional animals per mix

# Function to generate mixed samples (files + multi-hot labels)
def generate_mixed_samples(class_files, my_classes, samples_per_class=1000):  # Increase for larger dataset
    all_files = []  # List of [file1, file2, ...] for each sample
    all_labels = []  # Multi-hot np arrays
    for c in my_classes:
        class_idx = my_classes.index(c)
        for _ in range(samples_per_class):
            if random.random() < mix_prob:
                num_other = random.randint(1, max_mixes)
                other_classes = random.sample([oc for oc in my_classes if oc != c and oc != 'background'], num_other)  # Exclude background or include optionally
                mix_files = [random.choice(class_files[c])] + [random.choice(class_files[oc]) for oc in other_classes]
                mix_label = np.zeros(len(my_classes), dtype=np.float32)
                mix_label[class_idx] = 1.0
                for oc in other_classes:
                    mix_label[my_classes.index(oc)] = 1.0
                all_files.append(mix_files)
                all_labels.append(mix_label)
            else:
                single_file = [random.choice(class_files[c])]
                single_label = np.zeros(len(my_classes), dtype=np.float32)
                single_label[class_idx] = 1.0
                all_files.append(single_file)
                all_labels.append(single_label)
    return all_files, all_labels

# Generate
all_files, all_labels = generate_mixed_samples(class_files, my_classes)

# Split
train_files, test_files, train_labels, test_labels = train_test_split(all_files, all_labels, test_size=0.2, random_state=42)  # No stratify needed for multi-label
train_files, val_files, train_labels, val_labels = train_test_split(train_files, train_labels, test_size=0.125, random_state=42)

# Function to load and mix waveforms
@tf.function
def load_and_mix(files, label):
    waveforms = [load_wav_16k_mono(f) for f in files]
    # Pad to max length
    max_len = tf.reduce_max([tf.shape(w)[0] for w in waveforms])
    padded = [tf.pad(w, [[0, max_len - tf.shape(w)[0]]]) for w in waveforms]
    # Mix: Sum and normalize (simple overlay; adjust volumes optionally)
    mixed = tf.reduce_sum(padded, axis=0) / len(padded)
    return mixed, label  # Label is already multi-hot

# Datasets: Map to mix, then extract embeddings
train_ds = tf.data.Dataset.from_tensor_slices((train_files, train_labels)).map(load_and_mix).map(extract_embedding).unbatch().cache().batch(32).prefetch(tf.data.AUTOTUNE)
val_ds = tf.data.Dataset.from_tensor_slices((val_files, val_labels)).map(load_and_mix).map(extract_embedding).unbatch().cache().batch(32).prefetch(tf.data.AUTOTUNE)
test_ds = tf.data.Dataset.from_tensor_slices((test_files, test_labels)).map(load_and_mix).map(extract_embedding).unbatch().cache().batch(32).prefetch(tf.data.AUTOTUNE)

In [None]:
my_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(1024,), dtype=tf.float32, name='input_embedding'),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(len(my_classes), activation='sigmoid')  # Sigmoid for multi-label
], name='animal_model')

my_model.summary()

my_model.compile(loss=tf.keras.losses.BinaryCrossentropy(),  # Binary for multi-label
                 optimizer="adam",
                 metrics=[tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3, restore_best_weights=True)

history = my_model.fit(train_ds,
                       epochs=20,
                       validation_data=val_ds,
                       callbacks=[callback])

In [None]:
# Loss plot
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
plt.show()

# Binary Accuracy plot (update key if needed)
acc = history.history['binary_accuracy']
val_acc = history.history['val_binary_accuracy']

plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()
plt.show()

# Test evaluation
test_results = my_model.evaluate(test_ds, return_dict=True)
print(test_results)

In [None]:
# Upload and test new file
uploaded = files.upload()
new_file = list(uploaded.keys())[0]

waveform = load_wav_16k_mono(new_file)
scores, embeddings, spectrogram = yamnet_model(waveform)
result = my_model(embeddings).numpy()  # Logits
probs = tf.sigmoid(result.mean(axis=0)).numpy()  # Sigmoid probabilities
threshold = 0.5  # Tune this
detected_indices = np.where(probs > threshold)[0]
detected_classes = [my_classes[i] for i in detected_indices]
detected_probs = probs[detected_indices]

if len(detected_classes) == 0:
    print("No detections above threshold.")
else:
    print(f"Detected: {', '.join(detected_classes)} (probabilities: {', '.join(f'{p:.2f}' for p in detected_probs)})")