# **1. Prepare Data**

## **1.1 Basic Imports**

In [None]:
# Basic python imports
import os
import shutil
!pip install tabulate --quiet
from tabulate import tabulate

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

## **1.2 View the Audio Composition**

In [None]:
# Base path to dataset
dataset_path = os.path.normpath("/kaggle/input/z-by-hp-unlocked-challenge-3-signal-processing")

# Table headers
table_headers = ["Folder", "No. of Audio Files"]
table_data = []

# Traverse over the folders
for folder in os.listdir(dataset_path):
    # Update the list with folder and audio count
    table_data.append([
        os.path.join(dataset_path, folder),
        len(os.listdir(os.path.join(dataset_path, folder)))
    ])

# Print the table
print(tabulate(table_data, table_headers, tablefmt="grid"))

# **2. Install and Import Dependencies**

## **2.1 Install Dependencies**

In [None]:
# Install packages
!pip uninstall tensorflow --quiet --yes
!pip uninstall tensorflow-io --quiet --yes

!pip install tensorflow==2.10.0 tensorflow-io==0.27.0 --quiet

## **2.2 Load Dependencies**

In [None]:
# Import requried packages
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_io as tfio

# **3. Build Data Loading Function**

## **3.1 Define Paths to Files**

In [None]:
# Define paths
CAPUCHIN_FILE = os.path.join(dataset_path, "Parsed_Capuchinbird_Clips", "XC3776-3.wav")
NON_CAPUCHIN_FILE = os.path.join(dataset_path, "Parsed_Not_Capuchinbird_Clips", "afternoon-birds-song-in-forest-26.wav")

In [None]:
# View paths
CAPUCHIN_FILE, NON_CAPUCHIN_FILE

## **3.2 Build a Dataloading Function**

In [None]:
# Function to load and process the data
def load_wav_16k_mono(filename):
    # Load the encoded wav file
    file_contents = tf.io.read_file(filename)

    # Decode wav (tensors by channels)
    wav, sample_rate = tf.audio.decode_wav(file_contents, desired_channels=1)

    # Remove trailing axis
    wav = tf.squeeze(wav, axis=-1)

    # Cast the sample rate
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)

    # Goes from 44100hz to 16000hz - amplitude of the audio signal
    wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)

    # Return the wave data
    return wav

## **3.3. Plot Wave**

In [None]:
# Get the wave data for the files
wave = load_wav_16k_mono(CAPUCHIN_FILE)
nwave = load_wav_16k_mono(NON_CAPUCHIN_FILE)

In [None]:
# Plot the waves
plt.plot(wave)
plt.plot(nwave)
plt.legend(["Capuchin", "Non Capuchin"])
plt.title("Audio Wave Visualization")
plt.show()

# **4. Create Tensorflow Dataset**

## **4.1 Define Paths to Positive and Negative Data**

In [None]:
# Define the paths
POS = os.path.join(dataset_path, "Parsed_Capuchinbird_Clips")
NEG = os.path.join(dataset_path, "Parsed_Not_Capuchinbird_Clips")

In [None]:
# Print the path
POS, NEG

## **4.2 Create Tensorflow Datasets**

In [None]:
# Prepare the dataset
POS_DF = tf.data.Dataset.list_files(os.path.join(POS, "*.wav"))
NEG_DF = tf.data.Dataset.list_files(os.path.join(NEG, "*.wav"))

## **4.3 Add Label and Combine Positive and Negative Samples**

In [None]:
# Assign lables to data
positives = tf.data.Dataset.zip((POS_DF, tf.data.Dataset.from_tensor_slices(tf.ones(len(POS_DF)))))
negatives = tf.data.Dataset.zip((NEG_DF, tf.data.Dataset.from_tensor_slices(tf.zeros(len(NEG_DF)))))

In [None]:
# Join the datasets
data = positives.concatenate(negatives)

# **5. Determine Average Length of a Capuchinbird Call**

## **5.1 Calculate Wave Cycle Length**

In [None]:
# List to store the lengths of the Capuchin bird calls
lengths = []

# Traveres over the audio files
for file in os.listdir(POS):
    # Get the wave for the audio file
    tensor_wave = load_wav_16k_mono(os.path.join(POS, file))

    # Update the list
    lengths.append(len(tensor_wave))

## **5.2 Calculate Mean, Min and Max**

In [None]:
# Imports
import pandas as pd

# Conver the length to pandas series
lengths = pd.Series(lengths)

In [None]:
# Stats for the lengths
lengths.describe()

In [None]:
# Stats for the time
(lengths / 16000).describe()

# **6. Build Preprocessing Function to Convert to Spectrogram**

## **6.1 Build Preprocessing Function**

In [None]:
# Function to convert to image
def preprocess(file_path, label):
    # Get the wave
    wav = load_wav_16k_mono(file_path)

    # Get the first 56000 audio signals for every audio
    wav = wav[:56000]

    # Create zero padding
    zero_padding = tf.zeros([56000] - tf.shape(wav), dtype=tf.float32)

    # Join the wave and padding
    wav = tf.concat([zero_padding, wav], 0)

    # Get the spectrogram
    # Get the absolute values
    # Expand the dimensions
    spectrogram = tf.expand_dims(tf.abs(tf.signal.stft(wav, frame_length=320, frame_step=32)), axis=2)

    # Return the spectrogram and label
    return spectrogram, label

## **6.2 Test Out the Function and Viz the Spectrogram**

In [None]:
# Get three random items from the positives
num_images = 3
random_positives = positives.shuffle(buffer_size=10000).take(num_images)
pos_iter = random_positives.as_numpy_iterator()

# Create subplots with reduced vertical space
fig, axs = plt.subplots(num_images, 1, figsize=(30, 5 * num_images))

# Process and visualize each image
for i in range(num_images):
    pos_filepath, pos_label = pos_iter.next()
    pos_spectrogram, pos_label = preprocess(pos_filepath, pos_label)

    # Display the image
    axs[i].imshow(tf.transpose(pos_spectrogram)[0])
    axs[i].axis('off')

# Set title
axs[0].set_title("Capuchin Bird Call")

# Show the plot
plt.show()

In [None]:
# Get three random items from the negatives
num_images = 3
random_negatives = negatives.shuffle(buffer_size=10000).take(num_images)
neg_iter = random_negatives.as_numpy_iterator()

# Create subplots with reduced vertical space
fig, axs = plt.subplots(num_images, 1, figsize=(30, 5 * num_images))

# Process and visualize each image
for i in range(num_images):
    neg_filepath, neg_label = neg_iter.next()
    neg_spectrogram, neg_label = preprocess(neg_filepath, neg_label)

    # Display the image
    axs[i].imshow(tf.transpose(neg_spectrogram)[0])
    axs[i].axis('off')

# Set title
axs[0].set_title("Non Capuchin Bird Call")

# Show the plot
plt.show()

# **7. Create Training and Testing Partitions**

## **7.1 Create a Tensorflow Data Pipelines**

In [None]:
# Pass the data through all the steps
data = data.map(preprocess)
data = data.cache()
data = data.shuffle(buffer_size=1000)
data = data.batch(16)
data = data.prefetch(8)

## **7.2 Split into Training and Testing Partitions**

In [None]:
# Get the train and test sets
train_df = data.take(36)
test_df = data.skip(36).take(15)

## **7.3 View Sample Batch**

In [None]:
# # Get a sample
# samples, labels = train_df.as_numpy_iterator().next()

# # View the shape of samples
# samples.shape

# **8. Build Deep Learning Model**

## **8.1 Load Tensorflow Dependencies**

In [None]:
# Imports
from tensorflow.keras import *

## **8.2 Build Sequential Model, Compile and View Summary**

In [None]:
# Initialize a sequential model
model = models.Sequential()

# Convolutional block 1
model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(1741, 257, 1)))
model.add(layers.BatchNormalization())
model.add(layers.SpatialDropout2D(0.25))
model.add(layers.MaxPooling2D((2, 2)))

# Convolutional block 2
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.SpatialDropout2D(0.25))
model.add(layers.MaxPooling2D((2, 2)))

# Convolutional block 3
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.SpatialDropout2D(0.25))
model.add(layers.MaxPooling2D((2, 2)))

# Flatten layer to transition from convolutional to dense layers
model.add(layers.Flatten())

# Dense layers with dropout for regularization
model.add(layers.Dense(256, activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.Dropout(0.5))

# Output layer
model.add(layers.Dense(1, activation='sigmoid'))

In [None]:
# Using the Adam Optimizer with a specific learning rate
opt = optimizers.Adam(learning_rate=1e-4)

# Using BinaryCrossentropy as the loss function
loss_function = losses.BinaryCrossentropy()

# Compiling the model with BinaryCrossentropy loss, Adam optimizer, and additional metrics
model.compile(loss=loss_function, optimizer=opt, metrics=['accuracy', metrics.Precision(), metrics.Recall()])

In [None]:
# Viewing the summary of the model
model.summary()

In [None]:
# Plot the model
utils.plot_model(
    model,
    show_shapes=True,
    show_layer_names=True,
    expand_nested=True,
    show_layer_activations=True,
    dpi=300,
)

## **8.3 Add Model Callbacks**

In [None]:
# File Path to store the trained models
filepath = "./CNN-Models/model_{epoch:02d}-{val_accuracy:.2f}.h5"

# ModelCheckpoint callback to save the best model based on validation accuracy
checkpoint = callbacks.ModelCheckpoint(filepath, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

# Early stopping callback with patience of 5
early_stopping = callbacks.EarlyStopping(monitor='val_accuracy', patience=2, verbose=1)

# Learning rate decay callback using LearningRateScheduler
def lr_schedule(epoch):
    initial_lr = 1e-4
    decay_factor = 0.9
    decay_step = 10
    lr = initial_lr * (decay_factor ** (epoch // decay_step))
    return lr

# Initialize the learning rate scheduler
lr_decay = callbacks.LearningRateScheduler(lr_schedule)

# List of callbacks including ModelCheckpoint and LearningRateScheduler
callbacks_list = [early_stopping, checkpoint, lr_decay]

## **8.4 Train the Model**

In [None]:
# Clear RAM
import gc
gc.collect()

In [None]:
# Triaining the model
history = model.fit(train_df, epochs=5, validation_data=test_df, callbacks=callbacks_list)

In [None]:
# Clear the RAM
gc.collect()

## **8.5 Visualize Model Performance**

In [None]:
# Assuming 'history' is your pandas DataFrame
metrics = ['loss', 'accuracy', 'precision', 'recall']
colors = ['red', 'blue']

# Create a figure and axis for subplots
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(8, 6))
fig.suptitle('Metrics Over Training')

# Traverse over metrics
for i, metric in enumerate(metrics):
    # Get the quotient and remainder
    row, col = divmod(i, 2)

    # Plot training values
    axes[row, col].plot(history.history[metric], color=colors[0], label=f'Training {metric.capitalize()}')

    # Plot validation values
    axes[row, col].plot(history.history[f'val_{metric}'], color=colors[1], label=f'Validation {metric.capitalize()}')

    # Get the title and legend
    axes[row, col].set_title(metric.capitalize())
    axes[row, col].legend()

# Adjust layout and display
plt.tight_layout(rect=[0, 0, 1, 0.96])

# Show the plot
plt.show()

# **9. Make a Prediction on a Single Clip**

## **9.1 Get One Batch and Make a Prediction**

In [None]:
# Get a sample from the test set
X_test, y_test = test_df.as_numpy_iterator().next()

In [None]:
# View shape of data
X_test.shape, y_test.shape

In [None]:
# Get the predictions
y_pred = model.predict(X_test)

# View the predictions
y_pred

## **9.2 Convert Logits to Classes**

In [None]:
# Import
import numpy as np

# Flatten the predictions and convert to classes
y_pred = np.round(y_pred.flatten())

# View the new y_pred
y_pred

In [None]:
# Print out the actual values
y_test

## **9.3 Classification Report**

In [None]:
# Import
from sklearn.metrics import classification_report

# Print the classification report
print(classification_report(y_test, y_pred))

# **10. Load Model**

## **10.1 Load Pretrained Model**

In [None]:
# Imports
import glob

# Traverse the folder to find the best model
best_model = sorted(glob.glob("/kaggle/working/CNN-Models/*.h5"))[-1]

# Load the model
model = tf.keras.models.load_model(best_model)

# View model summary
model.summary()

# **11. Build Forest Parsing Functions**

## **11.1 Load Up MP3s**

In [None]:
# Function to load the MP3 audio files
def load_mp3_16k_mono(filename):
    # Load the audio file
    res = tfio.audio.AudioIOTensor(filename)

    # Convert to tensor and combine channels
    tensor =  tf.math.reduce_sum(res.to_tensor(), axis=1) / 2

    # Extract sample rate and cast
    sample_rate = tf.cast(res.rate, dtype=tf.int64)

    # Resample to 16k hz
    wav = tfio.audio.resample(tensor, rate_in=sample_rate, rate_out=16000)

    # Return the wav
    return wav

In [None]:
# Load a sample file using the function
wav = load_mp3_16k_mono("/kaggle/input/z-by-hp-unlocked-challenge-3-signal-processing/Forest Recordings/recording_00.mp3")

# View the data
wav

## **11.2 Slick the Sample Audio**

In [None]:
# Slice the audio file into multiple segments
audio_slices = tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=56000, sequence_stride=56000, batch_size=1)

# Extract the sample from from the slices
sample, idx = audio_slices.as_numpy_iterator().next()

In [None]:
# View the shape of audio and number of audio slices
sample.shape, len(audio_slices)

## **11.3 Function to Convert Clips into Windowed Spectrograms**

In [None]:
# Function to preprocess and get the spectrogram
def preprocess_mp3(sample, idx):
    # Get the sample
    sample = sample[0]

    # Add the zero padding
    zero_padding = tf.zeros([56000] - tf.shape(sample), dtype=tf.float32)

    # Get the padded wave
    wav = tf.concat([zero_padding, sample], 0)

    # Get the spectrogram
    spectrogram = tf.expand_dims(tf.abs(tf.signal.stft(wav, frame_length=320, frame_step=32)), axis=2)

    # Return the spectrogram
    return spectrogram

## **11.4 Convert Longer Clips into Windows**

In [None]:
# Slice the audio file into multiple segments
audio_slices = tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=56000, sequence_stride=56000, batch_size=1)

# Map the audio slices to the function
audio_slices = audio_slices.map(preprocess_mp3)

# Batch the slices
audio_slices = audio_slices.batch(64)

# **12. Prediction on Sample**

## **12.1 Make Predictions on Sample**

In [None]:
# Imports
import numpy as np

# Get the predictions
pred_labels = model.predict(audio_slices)

In [None]:
# Round the predictions
pred_labels = np.round(pred_labels.flatten())

# # Round the predictions with increased confidence
# pred_labels = np.where(pred_labels.flatten() > 0.8, 1, 0)

In [None]:
# View the predicted labels
len(pred_labels), pred_labels

In [None]:
# Get the sum to count the number of times the bird sound was found
np.sum(pred_labels)

## **12.2 Group Consecutive Detections**

In [None]:
# Imports
from itertools import groupby

# Apply the function to predictions
pred_labels = [key for key, group in groupby(pred_labels)]

# View the grouped results
pred_labels

In [None]:
# Get the sum to get the final number of calls
np.sum(pred_labels)

# **13. Prediction on All Forest Recordings**

## **13.1 Get the Number of Calls in Each Recording**

In [None]:
# Import
from tqdm import tqdm_notebook

# List to store the results
results = []

# Traverse over the folder for files
for file in tqdm_notebook(os.listdir("/kaggle/input/z-by-hp-unlocked-challenge-3-signal-processing/Forest Recordings")):
    # Get the path to file
    file_path = os.path.join("/kaggle/input/z-by-hp-unlocked-challenge-3-signal-processing/Forest Recordings", file)

    # Get the wave for the file
    wav = load_mp3_16k_mono(file_path)

    # Get the audio slices
    audio_slices = tf.keras.utils.timeseries_dataset_from_array(
        wav, wav,
        sequence_length=56000,
        sequence_stride=56000,
        batch_size=1
    ).map(preprocess_mp3).batch(64)

    # Get the predictions
    pred_labels = model.predict(audio_slices)

    # Round the predictions
    pred_labels = np.round(pred_labels.flatten())

    # Group consecutive calls
    pred_labels = [key for key, group in groupby(pred_labels)]

    # Get the sum to get the final number of calls
    total_calls = np.sum(pred_labels)

    # Add data to file
    results.append((file, total_calls))

## **13.2 Convert the List to Pandas Dataframe**

In [None]:
# Imports
import pandas as pd

# Create pandas dataframe
result_df = pd.DataFrame(results, columns=["recording", "capuchin_calls"])

In [None]:
# View the dataframe
result_df

In [None]:
# Sort the data by recordings column and reset index
result_df = result_df.sort_values(by=["recording"]).reset_index(drop=True)

# View the dataframe
result_df

In [None]:
# Export to csv file
result_df.to_csv("capuchinbird_results.csv", index=False)