# Evaluation metrics

For each cloned audio file:
1.	Check whether the watermark is still present.
2.	Assess whether the content remains the same.
3.	Evaluate any changes in audio quality.

In [115]:
import os
import re
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
import torch.nn.functional as F
from audioseal import AudioSeal
from tqdm import tqdm
import librosa
import torch
import pandas as pd
import numpy as np
import torchaudio

In [116]:
watermarked_path = "../Dataset/Watermarked Audio"
unwatermarked_path = "../Dataset/Unwatermarked Audio"
transcription_path = '../Dataset/Transcriptions/transcriptions_complete.csv'
results_path = '../Dataset/Results'

In [113]:
# 1. Import the watermark detector
detector = AudioSeal.load_detector("audioseal_detector_16bits")

# 2. Import the model for transcription
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
model_id = "openai/whisper-large-v3-turbo"
model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
model.to(device)
processor = AutoProcessor.from_pretrained(model_id)
pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    torch_dtype=torch_dtype,
    device=device)

# Whisper and AudioSeal expect a sample rate of 16khz
target_sr = 16000

Device set to use cpu


In [64]:
# Get all filepaths
unwatermarked_files = os.listdir(unwatermarked_path)
unwatermarked_files = [i for i in unwatermarked_files if i[-4:] == ".mp3"]
unwatermarked_files = [i for i in unwatermarked_files if "audioseal" in i]

watermarked_files = os.listdir(watermarked_path)
watermarked_files = [i for i in watermarked_files if i[-4:] == ".mp3"]
watermarked_files = [i for i in watermarked_files if "audioseal" in i]

In [65]:
# Helper to extract the ID from the filename
def extract_id(filename):
    match = re.search(r'common_voice_en_(\d+)', filename)
    return match.group(1) if match else None

In [66]:
# Build dicts by ID
un_dict = {extract_id(f): f for f in unwatermarked_files}
w_dict = {extract_id(f): f for f in watermarked_files}

# Find common IDs and build a dict with (un, w) tuples
matched = {id_: (un_dict[id_], w_dict[id_]) for id_ in un_dict.keys() & w_dict.keys()}

print(f"Audios to evaluate: {len(matched):0,.0f}")

Audios to evaluate: 14,124


In [None]:
# Transcriptios of the original file
transcriptions = pd.read_csv('../Dataset/Transcriptions/transcriptions_complete.csv')
# Create ID
transcriptions["id"] = transcriptions["clip"].apply(extract_id)
# Create results df
results = transcriptions[["id", 'transcription', 'confidence']]
results = results.rename(columns = {"confidence": "original_confidence", 
                                    "transcription": "original_transcription"})
results["prob_w"] = np.nan
results["unwatermarked_transcription"] = np.nan
results["unwatermarked_confidence"] = np.nan

# Create the list of ids of the remaining clips to process
filter = results[["prob_w", "unwatermarked_transcription", "unwatermarked_confidence"]].isna().any(axis = 1).values
ids = results.loc[filter, "id"].values
remaining_clips = list(set(ids) - matched.keys())

In [None]:
# for id, m in tqdm(matched.items()):
#     un = m[0]
#     w = m[1]

#     # Load the unwatermarked audio file
#     un_wav, sr = librosa.load(os.path.join(unwatermarked_path, un), sr = target_sr)
#     # Convert to a PyTorch tensor
#     un_wav_tensor = torch.tensor(un_wav).unsqueeze(0).unsqueeze(0)  # Shape: (1, 1, samples)


#     # 1. Detect the watermark
#     prob, _ = detector.detect_watermark(un_wav_tensor, sr)
#     # Add the probability of watermarking to the results
#     results.loc[results["id"] == id, "prob_w"] = prob

# # 2. Transcribe audio

In [None]:
# Configuration
batch_size = 2**3  # Adjust based on your GPU memory
batch_num = 0

# Store results incrementally
results_batch = []     # Stores intermediate batch results
all_results = []       # Stores all batch DataFrames for final concat

# Iterate over remaining clips in batches
for _ in tqdm(range(0, len(remaining_clips), batch_size)):
    batch_ids = remaining_clips[:batch_size]  # Take a batch of clip IDs
    batch_audio = []
    watermark_probs = []

    for id_ in batch_ids:
        un, w = matched[id_]
        clip_path = os.path.join(unwatermarked_path, un)

        # Load audio using torchaudio for Whisper compatibility
        wav, sr = torchaudio.load(clip_path)

        # Convert to mono if needed
        if wav.shape[0] > 1:
            wav = wav.mean(dim=0, keepdim=True)

        # Resample to target sampling rate
        wav = torchaudio.transforms.Resample(sr, target_sr)(wav)

        # --- Step 1: Watermark detection using librosa + torch ---
        wav_librosa, _ = librosa.load(clip_path, sr=target_sr)
        wav_tensor = torch.tensor(wav_librosa).unsqueeze(0).unsqueeze(0)
        prob, _ = detector.detect_watermark(wav_tensor, target_sr)
        watermark_probs.append(float(prob))  # Ensure float type

        # --- Step 2: Add audio to batch for transcription ---
        batch_audio.append(wav)

    # --- Step 3: Pad audio to equal length for batch processing ---
    max_len = max(wav.shape[1] for wav in batch_audio)
    batch_padded = [F.pad(wav, (0, max_len - wav.shape[1])) for wav in batch_audio]
    batch_tensor = torch.stack(batch_padded).squeeze(1)  # (batch, time)

    # --- Step 4: Convert to Whisper input format (list of numpy arrays) ---
    batch_np = [wav.cpu().numpy() for wav in batch_tensor]
    input_features = processor.feature_extractor(
        batch_np, sampling_rate=target_sr, return_tensors="pt"
    ).input_features.to(device, dtype=torch.float16)

    # --- Step 5: Generate transcriptions with Whisper ---
    with torch.no_grad():
        outputs = model.generate(
            input_features,
            return_dict_in_generate=True,
            output_scores=True,
            forced_decoder_ids=processor.get_decoder_prompt_ids(language="en", task="transcribe")
        )

    # Decode the generated tokens
    decoded = processor.tokenizer.batch_decode(outputs.sequences, skip_special_tokens=True)

    # --- Step 6: Compute confidence scores ---
    scores = outputs.scores
    if scores:
        token_probs = [F.softmax(logits, dim=-1).max() for logits in scores]
        confidences = [torch.mean(p).item() for p in token_probs]
    else:
        confidences = [None] * len(batch_ids)

    # --- Step 7: Store results in memory ---
    for id_, transcription, confidence, prob_w in zip(batch_ids, decoded, confidences, watermark_probs):
        results_batch.append({
            "id": id_,
            "unwatermarked_transcription": transcription,
            "unwatermarked_confidence": confidence,
            "prob_w": prob_w
        })

    # --- Step 8: Update progress ---
    batch_num += 1
    remaining_clips = remaining_clips[batch_size:]  # Remove processed clips

    # --- Step 9: Save batch results every N batches or at the end ---
    if (batch_num % 100 == 0) or (len(remaining_clips) == 0):
        batch_df = pd.DataFrame(results_batch)
        all_results.append(batch_df)
        batch_df.to_csv(os.path.join(results_path, f"results_batch_{batch_num // 100}.csv"), index=False)
        results_batch = []  # Reset batch storage

# --- Step 10: Save final concatenated result (optional) ---
final_df = pd.concat(all_results, ignore_index=True)
final_df.to_csv(os.path.join(results_path, "final_results.csv"), index=False)