In [None]:
import shutil
import os

# Remove the directory and all its contents
if os.path.exists('embedding_vectors'):
    shutil.rmtree('embedding_vectors')
    print("Folder 'embedding_vectors' removed.")
else:
    print("Folder 'embedding_vectors' does not exist.")

In [None]:
import csv
import pandas as pd
import librosa
import zipfile
import scipy
from scipy import signal
from scipy.io import wavfile
from joblib import dump, load
from sklearn.metrics import (make_scorer, f1_score, accuracy_score, confusion_matrix,
                             roc_auc_score, roc_curve, auc, precision_score,
                             recall_score, average_precision_score, precision_recall_curve)
import seaborn as sns
from tqdm.notebook import trange, tqdm
from matplotlib import pyplot as plt
import numpy as np

Auxiliar functions


In [None]:
def metrics_calculation(y_test, y_pred, decision_scores, label="Anomaly"):

    # print(f"True Labels (0=label, 1=Sauim):\n{y_test}")
    # print(f"Predicted Labels (0=label, 1=Sauim):\n{y_pred}")

    # Confusion Matrix
    cm = confusion_matrix(y_test, y_pred)

    # Plot Confusion Matrix
    plt.figure(figsize=(4.2, 4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', annot_kws={"size": 18},  cbar=False,
                xticklabels=[label, 'Sauim'], yticklabels=[label, 'Sauim'])
    # plt.ylabel('Actual Label', fontsize=18)
    # plt.xlabel('Predicted Label', fontsize=18)
    plt.xticks(fontsize=16)
    plt.yticks(fontsize=16)  # rotation=0 keeps them horizontal
    # plt.title(label, fontsize=20)
    plt.savefig("ConfusionMatrix_"+label+".pdf", format='pdf', dpi=300, bbox_inches='tight')
    plt.show()

    accuracy = accuracy_score(y_test, y_pred) # Accuracy (may be misleading with imbalanced data)
    f1 = f1_score(y_test, y_pred, pos_label=1, average='weighted') # F1-score for the 'sauim' class (positive class)

    # Precision and Recall
    precision = precision_score(y_test, y_pred, pos_label=1)
    recall = recall_score(y_test, y_pred, pos_label=1)
    # ROC AUC Curve (useful if you want to choose a threshold based on decision_function)
    # For ROC AUC, 1 represents the positive class (sauim)
    fpr, tpr, thresholds = roc_curve(y_test, decision_scores, pos_label=1)
    roc_auc = auc(fpr, tpr)

    print(f"Acc: {accuracy:.2f}, Prec: {precision:.2f}, Rec: {recall:.2f}, F1: {f1:.2f}, roc_auc: {roc_auc:.2f}")
    return fpr, tpr, roc_auc, cm

## Loand an unknow soundscape

In [None]:
!wget https://github.com/juancolonna/Sauim/raw/main/embedding_vectors.zip -O embedding_vectors.zip

# Open and extract files
with zipfile.ZipFile('embedding_vectors.zip', 'r') as zip_ref:
    zip_ref.extractall('.')

print(f'Files extracted')

soundscape_vectors = np.load('embedding_vectors/soundscape_vectors.npy')
soundscape_vectors_filtered = np.load('embedding_vectors/soundscape_vectors_filtered.npy')

## Load wav records

In [None]:
!wget https://github.com/juancolonna/Sauim/raw/main/ocsvm.joblib -O ocsvm.joblib
!wget https://github.com/juancolonna/Sauim/raw/main/ocsvm_filtered.joblib -O ocsvm_filtered.joblib
!wget https://github.com/juancolonna/Sauim/raw/main/records/Mindu_Saguinus%20bicolor_02.02.19-000.csv -O Mindu_Saguinus_bicolor_02.02.19-000.csv
!wget https://github.com/juancolonna/Sauim/raw/main/records/Mindu_Saguinus%20bicolor_02.02.19-000.wav -O Mindu_Saguinus_bicolor_02.02.19-000.wav
!wget https://github.com/juancolonna/Sauim/raw/main/records/Mindu_Saguinus_bicolor_02.02.19-000_filtered.wav -O Mindu_Saguinus_bicolor_02.02.19-000_filtered.wav

df = pd.read_csv('Mindu_Saguinus_bicolor_02.02.19-000.csv')

## Load fitted OCSVM model

In [None]:
clf = load("ocsvm.joblib")

sr = 32000
window_len = int(5*sr)
step = 1
hop_len = int(step*sr) # emulates an sliding window of 5 sec length and 1 sec step

y, sr = librosa.load('Mindu_Saguinus_bicolor_02.02.19-000.wav', sr=sr)
y = y / np.max(np.abs(y))

# 1. Distância da função de decisão (positiva = normal, negativa = anomalia)
decision_scores = clf.decision_function(soundscape_vectors)

# adjust this line, the threshold should be 0 if we not used the filtered version, otherwise -0.01
soundscape_preds = np.where(decision_scores >= 0.0, 1, 0)

print(f"Total detections: {np.sum(soundscape_preds == 1)}")
print(f"Confidence range: {decision_scores.min():.3f} to {decision_scores.max():.3f}")

preds_expanded = np.zeros(len(y), dtype=int)

for i, pred in enumerate(soundscape_preds):
    if pred == 0:
        continue  # skip negatives
    start = i * hop_len
    end = start + window_len
    if start > len(y-window_len):
        break
    preds_expanded[start:min(end, len(y))] = 1

len(preds_expanded)

window_init_time = np.array(range(0, soundscape_vectors.shape[0], int(hop_len/sr)))

manual_annotations = np.zeros(len(soundscape_preds))
for t in enumerate(df['TIME'].values):
    for i in range(len(window_init_time)):
        # simulating a window with 50% overlap
        if window_init_time[i]-2.5 < t[1] < window_init_time[i]+2.5:
            manual_annotations[i] = 1

fpr6, tpr6, roc_auc_detections, cm6 = metrics_calculation(manual_annotations, soundscape_preds, decision_scores, label="Background")

plt.figure(figsize=(5.4, 5))
plt.plot(fpr6, tpr6, lw=1.5, label='Detections (AUC = %0.2f)' % roc_auc_detections)
plt.plot([0, 1], [0, 1], color='navy', lw=1.5, linestyle='--')
plt.xlim([-0.01, 1.0])
plt.ylim([0.0, 1.01])
plt.legend(loc="lower right", fontsize=10)
plt.ylabel('True Positive Rate', fontsize=12)
plt.xlabel('False Positive Rate', fontsize=12)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)  # rotation=0 keeps them horizontal
plt.grid(True, linestyle='--', alpha=0.7)
plt.savefig("ROC_curve_detections.pdf", format='pdf', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
print("Decision function stats:")
print(f"min: {decision_scores.min():.4f}")
print(f"max: {decision_scores.max():.4f}")
print(f"mean: {decision_scores.mean():.4f}")

plt.figure(figsize=(6, 5))
plt.hist(decision_scores, bins=100)
plt.vlines(0, 0, 18, linestyles='dashed', colors='red', label='Decision threshold')
# plt.title("Distribution of One-Class SVM decision scores")
plt.xlabel("Decision scores", fontsize=12)
plt.ylabel("Count", fontsize=12)
plt.grid(True, linestyle='--', alpha=0.7)
plt.legend(loc="upper left", fontsize=10)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)  # rotation=0 keeps them horizontal
plt.savefig("decision_scores.pdf", format='pdf', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Converte índice para segundos
time = np.arange(len(y)) / sr

plt.figure(figsize=(23, 4))
plt.plot(time, y)
plt.plot(time, preds_expanded, 'r')
for t in df['TIME'].values:
    plt.axvline(x=t, color='black', linestyle='--')

plt.xlim([0, 325])
# plt.xlim([0, np.max(time)])
plt.ylim([-1, 1.05])
plt.grid(False)
plt.xlabel("Time (seconds)", fontsize=16)
plt.ylabel("Amplitude", fontsize=16)
plt.legend(["Sinal", "Detection", "Annotations"], loc='lower left', fontsize=14)
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.savefig("detections.pdf", format='pdf', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
def find_segments(binary):
    """
    Retorna lista de (start, end) em amostras
    onde binary == 1.
    """
    b = np.asarray(binary).astype(int)
    if b.sum() == 0:
        return []

    diff = np.diff(np.concatenate([[0], b, [0]]))
    starts = np.where(diff == 1)[0]
    ends = np.where(diff == -1)[0]

    return list(zip(starts, ends))

# encontra segmentos (sem filtro de duração)
segments = find_segments(preds_expanded)

print(f"Found {len(segments)} segments")

# Loop para gerar um espectrograma por vez
for i, (s, e) in enumerate(segments):
    y_seg = y[s:e]

    # índice da predição correspondente (janela que começa em s)
    pred_index = s // hop_len
    conf = decision_scores[pred_index] if pred_index < len(decision_scores) else None

    # STFT
    S = np.abs(librosa.stft(y_seg, n_fft=2048, hop_length=256))
    S_db = librosa.amplitude_to_db(S, ref=np.max)

    # Nova figura para cada espectrograma
    plt.figure(figsize=(10, 3), dpi=200)
    librosa.display.specshow(S_db, sr=sr, hop_length=256,
                             x_axis='time', y_axis='hz', cmap='magma')
    # plt.colorbar(format='%+2.0f dB')

    # # título com confiança
    # if conf is not None:
    #     title = f"Segment {i+1}: {s/sr:.1f}-{e/sr:.1f}s(p10 ** y) * ((1 - p10) ** (T - y)) | decision score: {conf:.4f}"
    # else:
    #     title = f"Segment {i+1}: {s/sr:.1f}-{e/sr:.1f}s"

    # plt.title(title)
    plt.tight_layout()
    plt.show()

In [None]:
clf = load("ocsvm_filtered.joblib")

y, sr = librosa.load('Mindu_Saguinus_bicolor_02.02.19-000_filtered.wav', sr=sr)
y = y / np.max(np.abs(y))

# 1. Distância da função de decisão (positiva = normal, negativa = anomalia)
decision_scores_filtered = clf.decision_function(soundscape_vectors_filtered)

# adjust this line, the threshold should be 0 if we not used the filtered version, otherwise -0.01
soundscape_preds_filtered = np.where(decision_scores_filtered >= 0.0, 1, 0)

print(f"Total detections: {np.sum(soundscape_preds_filtered == 1)}")
print(f"Confidence range: {decision_scores_filtered.min():.3f} to {decision_scores_filtered.max():.3f}")

preds_expanded_filtered = np.zeros(len(y), dtype=int)

for i, pred in enumerate(soundscape_preds_filtered):
    if pred == 0:
        continue  # skip negatives
    start = i * hop_len
    end = start + window_len
    if start > len(y-window_len):
        break
    preds_expanded_filtered[start:min(end, len(y))] = 1

len(preds_expanded_filtered)

window_init_time = np.array(range(0, soundscape_vectors_filtered.shape[0], int(hop_len/sr)))

manual_annotations = np.zeros(len(soundscape_preds_filtered))
for t in enumerate(df['TIME'].values):
    for i in range(len(window_init_time)):
        # simulating a window with 50% overlap
        if window_init_time[i]-2.5 < t[1] < window_init_time[i]+2.5:
            manual_annotations[i] = 1

fpr7, tpr7, roc_auc_detections_filtered, cm7 = metrics_calculation(manual_annotations,
                                                                   soundscape_preds_filtered,
                                                                   decision_scores_filtered,
                                                                   label="Background")

plt.figure(figsize=(5.4, 5))
plt.plot(fpr7, tpr7, lw=1.5, label='Detections (AUC = %0.2f)' % roc_auc_detections_filtered)
plt.plot([0, 1], [0, 1], color='navy', lw=1.5, linestyle='--')
plt.xlim([-0.01, 1.0])
plt.ylim([0.0, 1.01])
plt.legend(loc="lower right", fontsize=10)
plt.ylabel('True Positive Rate', fontsize=12)
plt.xlabel('False Positive Rate', fontsize=12)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)  # rotation=0 keeps them horizontal
plt.grid(True, linestyle='--', alpha=0.7)
plt.savefig("ROC_curve_detections_filtered.pdf", format='pdf', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Converte índice para segundos
time = np.arange(len(y)) / sr

plt.figure(figsize=(23, 4))
plt.plot(time, y)
plt.plot(time, preds_expanded_filtered, 'r')
for t in df['TIME'].values:
    plt.axvline(x=t, color='black', linestyle='--')

# plt.xlim([0, np.max(time)])
plt.xlim([0, 325])
plt.ylim([-1, 1.05])
plt.grid(False)
plt.xlabel("Time (seconds)", fontsize=16)
plt.ylabel("Amplitude", fontsize=16)
plt.legend(["Sinal", "Detection", "Annotations"], loc='lower left', fontsize=14)
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.savefig("detections_filtered.pdf", format='pdf', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
def find_segments(binary):
    """
    Retorna lista de (start, end) em amostras
    onde binary == 1.
    """
    b = np.asarray(binary).astype(int)
    if b.sum() == 0:
        return []

    diff = np.diff(np.concatenate([[0], b, [0]]))
    starts = np.where(diff == 1)[0]
    ends = np.where(diff == -1)[0]

    return list(zip(starts, ends))

# encontra segmentos (sem filtro de duração)
segments = find_segments(preds_expanded_filtered)

print(f"Found {len(segments)} segments")

# Loop para gerar um espectrograma por vez
for i, (s, e) in enumerate(segments):
    y_seg = y[s:e]

    # índice da predição correspondente (janela que começa em s)
    pred_index = s // hop_len
    conf = decision_scores_filtered[pred_index] if pred_index < len(decision_scores_filtered) else None

    # STFT
    S = np.abs(librosa.stft(y_seg, n_fft=2048, hop_length=256))
    S_db = librosa.amplitude_to_db(S, ref=np.max)

    # Nova figura para cada espectrograma
    plt.figure(figsize=(10, 3), dpi=200)
    librosa.display.specshow(S_db, sr=sr, hop_length=256,
                             x_axis='time', y_axis='hz', cmap='magma')
    # plt.colorbar(format='%+2.0f dB')

    # # título com confiança
    # if conf is not None:
    #     title = f"Segment {i+1}: {s/sr:.1f}-{e/sr:.1f}s(p10 ** y) * ((1 - p10) ** (T - y)) | decision score: {conf:.4f}"
    # else:
    #     title = f"Segment {i+1}: {s/sr:.1f}-{e/sr:.1f}s"

    # plt.title(title)
    plt.tight_layout()
    plt.show()

In [None]:
plt.figure(figsize=(5.4, 5))
plt.plot(fpr6, tpr6, lw=1.5, label='Without filter (AUC = %0.2f)' % roc_auc_detections)
plt.plot(fpr7, tpr7, lw=1.5, label='With filter (AUC = %0.2f)' % roc_auc_detections_filtered)
plt.plot([0, 1], [0, 1], color='navy', lw=1.5, linestyle='--')
plt.xlim([-0.01, 1.0])
plt.ylim([0.0, 1.01])
plt.legend(loc="lower right", fontsize=10)
plt.ylabel('True Positive Rate', fontsize=12)
plt.xlabel('False Positive Rate', fontsize=12)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)  # rotation=0 keeps them horizontal
plt.grid(True, linestyle='--', alpha=0.7)
plt.savefig("ROC_curve_detections.pdf", format='pdf', dpi=300, bbox_inches='tight')
plt.show()