<a href="https://colab.research.google.com/github/mayibongwemoyo/dawm/blob/main/new_pfb_new_detector_gm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%capture
import sys
!pip install torchaudio==0.13.1
!pip install soundfile datasets librosa pandas seaborn matplotlib scipy sklearn
!pip install datasets
!pip install audioseal
from google.colab import drive
drive.mount('/content/drive')

import sys
sys.path.append('/content/drive/MyDrive/dawn/examples')
# from datasets import load_dataset
import torch
import torchaudio
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from scipy import signal
# import urllib.request
import urllib
import io
from audioseal import AudioSeal

In [2]:
# import sys
# import torch
# import torchaudio
# import numpy as np
# import pandas as pd
# from audioseal import AudioSeal
# from scipy import stats # Keep for potential future statistical tests

# Constants from your code
NUM_WATERMARKS = 4
SAMPLE_RATE = 16000
sr = 16000 # Alias for convenience

# Initialize models (assuming these are loaded correctly)
# Ensure you have the correct model paths/names if not using default download
try:
    generator = AudioSeal.load_generator("audioseal_wm_16bits")
    detector = AudioSeal.load_detector("audioseal_detector_16bits")
    print("AudioSeal models loaded successfully.")
except Exception as e:
    print(f"Error loading AudioSeal models: {e}")
    # Handle error appropriately, maybe stop execution or use placeholder models

def preprocess_audio(audio, sr):
    """Convert audio to standard format: (1, 1, T) @ 16kHz"""
    # Convert numpy arrays to tensor
    if isinstance(audio, np.ndarray):
        audio = torch.from_numpy(audio).float()

    # Ensure 3D shape: (batch=1, channels=1, time)
    if audio.dim() == 1:
        audio = audio.unsqueeze(0).unsqueeze(0)  # (1, 1, T)
    elif audio.dim() == 2:
        # Assuming (C, T), add batch dimension
        if audio.shape[0] > 1 and audio.shape[1] > 1: # Check if likely (C, T)
             audio = audio.unsqueeze(0) # (1, C, T)
        else: # Likely (T, C) or similar, needs reshape/check
            print(f"Warning: Unexpected 2D audio shape {audio.shape}. Assuming (T, C) and taking first channel.")
            audio = audio[:, 0].unsqueeze(0).unsqueeze(0) # Take first channel -> (1, 1, T)

    # Convert to mono if needed
    if audio.shape[1] > 1:
        audio = audio.mean(dim=1, keepdim=True)

    # Resample to 16kHz
    if sr != SAMPLE_RATE:
        resampler = torchaudio.transforms.Resample(sr, SAMPLE_RATE)
        audio = resampler(audio)

    # Ensure float32 type
    audio = audio.float()

    # Normalize audio amplitude to prevent clipping issues during watermarking
    # This is a common practice but might need adjustment based on AudioSeal's specifics
    max_val = torch.max(torch.abs(audio))
    if max_val > 1.0:
        print(f"Warning: Audio amplitude max {max_val} > 1.0. Normalizing.")
        audio = audio / max_val
    elif max_val == 0:
        print("Warning: Audio signal is silent.")
        return audio # Avoid division by zero

    return audio

def calculate_snr(original, watermarked):
    """Calculates Signal-to-Noise Ratio in dB."""
    noise = watermarked - original
    # Add epsilon to prevent log10(0) or division by zero
    epsilon = 1e-10
    signal_power = torch.mean(original.pow(2))
    noise_power = torch.mean(noise.pow(2))

    if noise_power < epsilon:
        print("Warning: Noise power is near zero. Setting SNR to infinity (e.g., 100 dB).")
        return 100.0 # Assign a high value for near-zero noise

    if signal_power < epsilon:
         print("Warning: Signal power is near zero. SNR might be misleading or infinite.")
         if noise_power < epsilon:
              return 0.0 # Or handle as undefined; signal and noise are both zero
         else:
              # Signal is zero, noise is not; SNR is effectively -infinity
              return -100.0 # Assign a very low value

    snr = 10 * torch.log10(signal_power / (noise_power + epsilon))
    return snr.item()

AudioSeal models loaded successfully.


In [3]:
def embed_pfb(audio, sr, step, alpha=0.5, num_bands=4):
    """Apply one watermark step (Parallel Frequency Bands - step-wise)."""
    if not isinstance(audio, torch.Tensor):
        raise TypeError("Input audio must be a PyTorch tensor.")
    if audio.dim() != 3 or audio.shape[0] != 1 or audio.shape[1] != 1:
         raise ValueError(f"Input audio must have shape (1, 1, T), but got {audio.shape}")


    fft_audio = torch.fft.fft(audio.squeeze(0).squeeze(0)) # Perform FFT on the 1D signal (T,)
    bands = torch.chunk(fft_audio, num_bands, dim=-1)

    target_band_idx = step % num_bands
    band_to_watermark = bands[target_band_idx]

    # AudioSeal generator expects shape (B, T) or (B, C, T)
    # We need to adapt the 1D frequency band.
    # Option 1: Watermark only the real part (as in your original attempt)
    # This might discard important phase information.
    # watermarked_real = generator(band_to_watermark.real.unsqueeze(0), sample_rate=sr, alpha=alpha)
    # watermarked_complex_band = watermarked_real.squeeze(0) + 1j * band_to_watermark.imag

    # Option 2: Treat complex numbers carefully. AudioSeal might not directly support complex inputs.
    # A common approach is to watermark magnitude or phase separately, or use a complex-aware model if available.
    # Let's try watermarking the real part for now, acknowledging its limitation.
    # Assuming generator works on (B, T)-like shape:
    real_part_unsqueezed = band_to_watermark.real.unsqueeze(0).unsqueeze(0) # Shape (1, 1, T_band)
    watermarked_real_part = generator(real_part_unsqueezed, sample_rate=sr, alpha=alpha)
    watermarked_complex_band = watermarked_real_part.squeeze(0).squeeze(0) + 1j * band_to_watermark.imag


    watermarked_bands = list(bands)
    watermarked_bands[target_band_idx] = watermarked_complex_band

    # Reconstruct the full spectrum and apply inverse FFT
    reconstructed_fft = torch.cat(watermarked_bands, dim=-1)
    watermarked_audio_1d = torch.fft.ifft(reconstructed_fft).real

    # Reshape back to (1, 1, T)
    watermarked_audio = watermarked_audio_1d.unsqueeze(0).unsqueeze(0)

    return watermarked_audio

In [4]:
def detect_pfb(watermarked_audio, sr, step, message, num_bands=4, detection_threshold=0.5):
    """
    Detect watermark in a specific frequency band (Parallel Frequency Bands).

    Args:
        watermarked_audio (torch.Tensor): The audio signal possibly containing watermarks (shape 1, 1, T).
        sr (int): Sample rate.
        step (int): The watermark step (determines which band to check, 0-indexed).
        message (torch.Tensor): The secret message bits (shape 1, N_BITS) to test against.
                                Usually 16 bits for AudioSeal.
        num_bands (int): Number of frequency bands used during embedding.
        detection_threshold (float): Confidence threshold for successful detection.

    Returns:
        tuple: (detection_probability (float), is_detected (bool), detected_bits (torch.Tensor or None))
               is_detected is True if prob > threshold.
               detected_bits contains the extracted bits if detection is attempted.
    """
    if not isinstance(watermarked_audio, torch.Tensor):
        raise TypeError("Input audio must be a PyTorch tensor.")
    if watermarked_audio.dim() != 3 or watermarked_audio.shape[0] != 1 or watermarked_audio.shape[1] != 1:
         raise ValueError(f"Input audio must have shape (1, 1, T), but got {watermarked_audio.shape}")


    fft_audio = torch.fft.fft(watermarked_audio.squeeze(0).squeeze(0)) # Perform FFT on the 1D signal (T,)
    bands = torch.chunk(fft_audio, num_bands, dim=-1)

    target_band_idx = step % num_bands
    band_to_check = bands[target_band_idx]

    # Prepare the band for the AudioSeal detector
    # The detector likely expects a time-domain signal of shape (B, T) or (B, C, T).
    # We need to inverse FFT the *specific band* (potentially zero-padded)
    # or use a detector modified for frequency domain input if available.

    # Approach 1: Inverse FFT the single band (might not be what AudioSeal expects)
    # Zero-pad other bands to maintain original length for IFFT
    zero_band = torch.zeros_like(band_to_check)
    padded_fft = [zero_band] * num_bands
    padded_fft[target_band_idx] = band_to_check
    reconstructed_fft_single_band = torch.cat(padded_fft, dim=-1)
    audio_single_band_1d = torch.fft.ifft(reconstructed_fft_single_band).real

    # Reshape for detector
    audio_to_detect = audio_single_band_1d.unsqueeze(0).unsqueeze(0) # Shape (1, 1, T)

    # --- Detection ---
    detector.message = message # Set the message to test for
    try:
        # Use the standard detector on the isolated band's time-domain representation
        # Note: Performance might be suboptimal as the detector wasn't trained for this.
        prob, detected_bits = detector.detect_watermark(audio_to_detect, sr)

        detection_prob = prob.item() if hasattr(prob, 'item') else float(prob) # Ensure float
        is_detected = detection_prob > detection_threshold

        # Calculate BER for this specific detection attempt
        ber = (message.round() != detected_bits.round()).float().mean().item()

        #print(f"Step {step+1} (Band {target_band_idx+1}): Prob={detection_prob:.2f}, BER={ber:.2f}, Detected={is_detected}")

        return detection_prob, is_detected, detected_bits.round(), ber

    except Exception as e:
        print(f"Error during detection for step {step} (Band {target_band_idx}): {e}")
        return 0.0, False, None, 1.0 # Return failure state

In [5]:
def calculate_metrics_pfb(original, watermarked, step, num_fake=10, num_bands=4):
    """Calculate metrics for a single PFB watermark step."""
    snr = calculate_snr(original, watermarked)

    # Real message detection for the specific band/step
    real_msg = torch.randint(0, 2, (1, 16), dtype=torch.float32) # Ensure float32 for BER calc
    prob_real, is_detected_real, detected_real, ber_real = detect_pfb(
        watermarked, SAMPLE_RATE, step, real_msg, num_bands=num_bands
    )

    # Fake message detection (False Positives) for the specific band/step
    false_positives_count = 0
    for _ in range(num_fake):
        fake_msg = torch.randint(0, 2, (1, 16), dtype=torch.float32)
        # Ensure fake_msg is different from real_msg for a meaningful FP test
        while torch.equal(fake_msg, real_msg):
             fake_msg = torch.randint(0, 2, (1, 16), dtype=torch.float32)

        prob_fake, is_detected_fake, _, _ = detect_pfb(
            watermarked, SAMPLE_RATE, step, fake_msg, num_bands=num_bands
        )
        if is_detected_fake:
            false_positives_count += 1

    false_positive_rate = false_positives_count / num_fake

    return {
        "method": "PFB",
        "step": step + 1,  # 1-based indexing for reporting
        "band_index": (step % num_bands) + 1,
        "snr": snr,
        "ber": ber_real,
        "detection_prob": prob_real,
        "is_detected": is_detected_real,
        "false_positive_rate": false_positive_rate
    }

In [6]:
def process_audio_pfb(audio_data, sr, num_watermarks=NUM_WATERMARKS, alpha=0.5, num_bands=4):
    """Processes a single audio file using the PFB method incrementally."""
    original_audio = preprocess_audio(audio_data, sr)
    watermarked_audio = original_audio.clone()
    results = []

    print(f"Processing audio with PFB (Num Watermarks: {num_watermarks}, Alpha: {alpha}, Bands: {num_bands})")
    initial_snr = calculate_snr(original_audio, original_audio) # Should be inf or high
    print(f"Initial SNR: {initial_snr:.2f} dB")


    for step in range(num_watermarks):
        print(f"--- Embedding Step {step+1} (Band { (step % num_bands) + 1 }) ---")
        # Embed watermark for the current step
        watermarked_audio = embed_pfb(watermarked_audio, SAMPLE_RATE, step, alpha, num_bands)

        # Calculate metrics *after* this step's watermark is embedded
        # The metrics reflect the state *with* watermark 'step' included
        metrics = calculate_metrics_pfb(
            original=original_audio, # Compare against the original for cumulative SNR/BER
            watermarked=watermarked_audio,
            step=step, # 0-indexed step for detection logic
            num_bands=num_bands
        )
        results.append(metrics)
        print(f"Step {step+1} Metrics: SNR={metrics['snr']:.2f}, BER={metrics['ber']:.2f}, DetProb={metrics['detection_prob']:.2f}, Detected={metrics['is_detected']}, FPR={metrics['false_positive_rate']:.2f}")


    print("-" * 20)
    return pd.DataFrame(results)


In [7]:
# --- Example Usage ---
# Load your audio data here (replace with actual loading)
# Example: using a dummy sine wave
# duration_sec = 5
# frequency = 440
# t = torch.linspace(0, duration_sec, int(SAMPLE_RATE * duration_sec), dtype=torch.float32)
# dummy_audio_data = 0.5 * torch.sin(2 * torch.pi * frequency * t)

# Assuming you load audio similar to your original notebook:
from datasets import load_dataset
dataset = load_dataset("facebook/voxpopuli", "en", split="validation", streaming=True, trust_remote_code=True)
audio_samples = [(ex["audio"]["array"], ex["audio"]["sampling_rate"]) for ex in dataset.take(1)] # Take one sample
if audio_samples:
    audio_data, original_sr = audio_samples[0]
    pfb_results_df = process_audio_pfb(audio_data, original_sr)
    print("\nPFB Results DataFrame:")
    print(pfb_results_df)
else:
    print("Could not load audio data.")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

voxpopuli.py:   0%|          | 0.00/8.84k [00:00<?, ?B/s]

Processing audio with PFB (Num Watermarks: 4, Alpha: 0.5, Bands: 4)
Initial SNR: 100.00 dB
--- Embedding Step 1 (Band 1) ---
Step 1 Metrics: SNR=54.91, BER=0.62, DetProb=0.00, Detected=False, FPR=0.00
--- Embedding Step 2 (Band 2) ---
Step 2 Metrics: SNR=54.72, BER=0.81, DetProb=0.03, Detected=False, FPR=0.00
--- Embedding Step 3 (Band 3) ---
Step 3 Metrics: SNR=54.55, BER=0.44, DetProb=0.04, Detected=False, FPR=0.00
--- Embedding Step 4 (Band 4) ---
Step 4 Metrics: SNR=51.61, BER=0.44, DetProb=0.00, Detected=False, FPR=0.00
--------------------

PFB Results DataFrame:
  method  step  band_index        snr     ber  detection_prob  is_detected  \
0    PFB     1           1  54.906578  0.6250        0.000758        False   
1    PFB     2           2  54.720856  0.8125        0.025644        False   
2    PFB     3           3  54.549477  0.4375        0.040262        False   
3    PFB     4           4  51.607113  0.4375        0.000815        False   

   false_positive_rate  
0       