In [None]:
import kagglehub

# Download the dataset
path = kagglehub.dataset_download("shreyj1729/best-of-watkins-marine-mammal-sound-database")
print("Path to dataset files:", path)

In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
import librosa
import librosa.display
import cv2

# Parameters for audio processing
input_directory = os.path.join(path, 'data')  # Root directory of the dataset
output_directory = "processed_spectrograms"
sampling_rate = 8000  # Target sampling rate
n_fft = 256  # FFT size
image_size = (28, 28)  # Resize images to 128x128

# Get the list of categories (folder names)
categories = sorted([d for d in os.listdir(input_directory) if os.path.isdir(os.path.join(input_directory, d))])

# Ensure output directory exists
os.makedirs(output_directory, exist_ok=True)

# Initialize data and labels
X = []  # Spectrogram image data
y = []  # Labels corresponding to categories

# Loop through categories to process WAV files
for label, category in enumerate(categories):
    category_path = os.path.join(input_directory, category)
    print(f"Processing category: {category}")
    
    for filename in os.listdir(category_path):
        if filename.endswith(".wav"):
            input_file = os.path.join(category_path, filename)
            output_file = os.path.join(output_directory, f"{category}_{os.path.splitext(filename)[0]}.png")
            
            try:
                # Load audio file and resample to the specified sampling rate
                y_audio, sr = librosa.load(input_file, sr=sampling_rate)
                
                # Generate spectrogram
                D = librosa.amplitude_to_db(np.abs(librosa.stft(y_audio, n_fft=n_fft)), ref=np.max)
                
                # Plot and save spectrogram
                plt.figure(figsize=(5, 5))
                librosa.display.specshow(D, sr=sampling_rate, x_axis='time', y_axis='log', cmap='viridis')
                plt.axis('off')  # Turn off the axis
                plt.tight_layout()
                plt.savefig(output_file, bbox_inches='tight', pad_inches=0)
                plt.close()
                
                # Read saved spectrogram as an image
                img = cv2.imread(output_file)
                if img is not None:
                    img_resized = cv2.resize(img, image_size)
                    X.append(img_resized)
                    y.append(label)
                    
                print(f"Processed and saved spectrogram for {filename}")
            
            except Exception as e:
                print(f"Error processing {input_file}: {e}")

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

# Convert lists to numpy arrays
X = np.array(X, dtype=np.float32) / 255.0  # Normalize image data
y = np.array(y, dtype=np.int32)  # Labels as integers

# One-hot encode the labels
encoder = OneHotEncoder(sparse=False)
y = encoder.fit_transform(y.reshape(-1, 1))

print(X.shape)
print(y.shape)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Split the training set further into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

# Summary
print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"y_test shape: {y_test.shape}")
print(f"X_val shape: {X_val.shape}")
print(f"y_val shape: {y_val.shape}")

# Save categories for reference
with open("categories.txt", "w") as f:
    for i, category in enumerate(categories):
        f.write(f"{i}: {category}\n")
print("Categories saved to 'categories.txt'")

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dropout, MaxPooling2D, Flatten, Dense, BatchNormalization, Activation, GlobalAveragePooling2D, Add
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.utils import to_categorical


# Build the model
def create_model(input_shape, num_classes):
    # Input layer
    inputs = Input(shape=input_shape)

    # Block 1: Convolution + BatchNorm + Activation + MaxPooling
    x = Conv2D(32, (3, 3), padding='same')(inputs)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(32, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.3)(x)

    # Block 2: Deeper convolutional layers with residual connection
    shortcut = x
    x = Conv2D(64, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(64, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    shortcut = Conv2D(64, (1, 1), padding='same')(shortcut)  # Match dimensions
    x = Add()([x, shortcut])  # Residual connection
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.4)(x)

    # Block 3: Wider convolutional layers
    x = Conv2D(128, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(128, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.5)(x)

    # Global pooling instead of Flatten to reduce parameter count
    x = GlobalAveragePooling2D()(x)

    # Fully connected layers with Dropout
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.5)(x)

    # Output layer
    outputs = Dense(num_classes, activation='softmax')(x)

    # Model creation
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Model parameters
input_shape = (image_size[0], image_size[1], 3)  # Image shape with 3 color channels (RGB)
num_classes = len(categories)

model = create_model(input_shape, num_classes)

# Compile the model
optimizer = SGD(learning_rate=0.01, momentum=0.9)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

# Callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6)

# Train the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=50,
    batch_size=32,
    callbacks=[early_stopping, reduce_lr]
)

# Save the trained model
model.save('Model.h5')
print("Model saved to 'Model.h5'")

In [None]:
# Load the trained model (if saved earlier)
from keras.models import load_model
model = load_model('Model.h5')

# Evaluate on the test data
test_loss, test_accuracy = model.evaluate(X_test, y_test, batch_size=32)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")

# Make predictions on the test data
predictions = model.predict(X_test)

# Convert predictions from one-hot encoding to class indices
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(y_test, axis=1)

# Check a few predictions
print(f"Predicted: {predicted_classes[:10]}")
print(f"True: {true_classes[:10]}")


In [None]:
from sklearn.metrics import classification_report, confusion_matrix

# Classification report
print("Classification Report:")
print(classification_report(true_classes, predicted_classes))

# Confusion matrix
print("Confusion Matrix:")
conf_matrix = confusion_matrix(true_classes, predicted_classes)
print(conf_matrix)
