In [1]:
!git clone https://github.com/shreyjain711/wm-tango.git

fatal: destination path 'wm-tango' already exists and is not an empty directory.


In [4]:
%cd '/home/ec2-user/SageMaker/wm-tango'
!pwd

/home/ec2-user/SageMaker/wm-tango
/home/ec2-user/SageMaker/wm-tango


In [5]:
!git pull

Already up to date.


In [None]:
!pip install setuptools==70.3.0
!pip install -r requirements.txt --quiet
!pip install jax==0.4.23 --quiet
!pip install jaxlib==0.4.23 --quiet



In [5]:
# # Environmental Sounds
# prompt_1 = "Rain falling softly on a tin roof during a quiet night"
# prompt_2 = "Waves crashing gently on a sandy beach"

# # Human Actions
# prompt_3 = "A crowd cheering and clapping at a concert"
# prompt_4 = "Footsteps echoing in a large empty hallway"

# # Musical or Instrumental Sounds
# prompt_5 = "A piano playing a calm and soft melody in a quiet room"
# prompt_6 = "An acoustic guitar being strummed near a campfire"

# # Animal Sounds
# prompt_7 = "Birds chirping in a dense forest at sunrise"
# prompt_8 = "A dog barking excitedly in a backyard"

# # Urban and Cityscapes
# prompt_9 = "The hum of traffic on a busy city street"
# prompt_10 = "Construction sounds with hammering and drilling"



# import IPython
# import soundfile as sf
# from tango import Tango

# tango = Tango("declare-lab/tango2")

# p_num = 2
# prompt = eval(f'prompt_{p_num}')
# audio1 = tango.generate(prompt, wm_flag=True)
# sf.write(f"wm_p{p_num}.wav", audio1, samplerate=16000)

# audio2 = tango.generate(prompt, wm_flag=False)
# sf.write(f"no_wm_p{p_num}.wav", audio2, samplerate=16000)

In [6]:
# import torch
# import numpy as np

# # Assuming necessary imports and initializations
# batch_size = 1
# audio_shape = (1, 1, 16000)  # Adjust based on TANGO's expected input shape
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# # Original noise
# noise = torch.randn(batch_size, *audio_shape, device=device)

# # Perform FFT
# F_noise = torch.fft.fft(noise)

# # Create a mask for the watermark (define create_watermark_mask accordingly)
# def create_watermark_mask(shape):
#     mask = torch.zeros(shape, dtype=torch.bool)
#     # Example: Set mask for specific frequency bands
#     mask[..., 100:200] = True
#     return mask

# mask = create_watermark_mask(F_noise.shape)

# # Define the watermark pattern
# watermark_pattern = torch.rand(mask.sum(), device=device)

# # Embed the watermark
# F_noise[mask] = watermark_pattern

# # Inverse FFT to get the watermarked noise
# watermarked_noise = torch.fft.ifft(F_noise).real


In [7]:
# In TANGO's generation code
# Original noise initialization
# noise = torch.randn(batch_size, *audio_shape, device=device)

# Use watermarked noise instead
# noise = watermarked_noise


In [8]:
import json
import torch
from tqdm import tqdm
from huggingface_hub import snapshot_download
from models import AudioDiffusion, DDPMScheduler
from audioldm.audio.stft import TacotronSTFT
from audioldm.variational_autoencoder import AutoencoderKL


class AudioDiffusionInversion:
    def __init__(self, name="declare-lab/tango", device="cuda:0"):
        
        self.device = device
        
        path = snapshot_download(repo_id=name)
        
        vae_config = json.load(open("{}/vae_config.json".format(path)))
        stft_config = json.load(open("{}/stft_config.json".format(path)))
        main_config = json.load(open("{}/main_config.json".format(path)))
        
        self.vae = AutoencoderKL(**vae_config).to(device)
        self.stft = TacotronSTFT(**stft_config).to(device)
        self.model = AudioDiffusion(**main_config).to(device)
        
        vae_weights = torch.load("{}/pytorch_model_vae.bin".format(path), map_location=device)
        stft_weights = torch.load("{}/pytorch_model_stft.bin".format(path), map_location=device)
        main_weights = torch.load("{}/pytorch_model_main.bin".format(path), map_location=device)
        
        self.vae.load_state_dict(vae_weights)
        self.stft.load_state_dict(stft_weights)
        self.model.load_state_dict(main_weights)

        print ("Successfully loaded checkpoint from:", name)
        
        self.vae.eval()
        self.stft.eval()
        self.model.eval()
        
        self.scheduler = DDPMScheduler.from_pretrained(main_config["scheduler_name"], subfolder="scheduler")
        

    @torch.no_grad()
    def get_latents_from_audio(self, audio_waveform):
        """
        Encodes audio into latents using STFT and VAE.
        :param audio_waveform: Input waveform to encode.
        :return: Latent representation of the audio.
        """
        mel_spectrogram, _, _ = self.stft.mel_spectrogram(audio_waveform.unsqueeze(0).to(self.device))
        latents = self.vae.encode_first_stage(mel_spectrogram.unsqueeze(0)).sample()
        return latents


    @torch.no_grad()
    @torch.no_grad()
    def backward_diffusion(self, latents, num_inference_steps=50):
        """
        Performs the backward diffusion process to reconstruct noise.

        :param latents: Initial latent variables (e.g., generated from a VAE or forward diffusion process).
        :param num_inference_steps: Number of diffusion steps to reverse the process.
        :return: Reconstructed latents representing the noise.
        """
        # Set up the scheduler for backward diffusion
        self.scheduler.set_timesteps(num_inference_steps, device=latents.device)  # Use self.scheduler here
        timesteps = self.scheduler.timesteps

        # Start the reverse diffusion process
        for t in tqdm(reversed(timesteps), desc="Reversing Diffusion"):
            # Current alpha value
            alpha_prod_t = self.scheduler.alphas_cumprod[t]
            beta_prod_t = 1 - alpha_prod_t

            # Predict the noise added at this step
            noise_pred = self.model.unet(
                latents, t, encoder_hidden_states=None  # encoder_hidden_states=None for unconditional generation
            ).sample

            # Update the latents to the previous step
            latents = (
                (latents - beta_prod_t.sqrt() * noise_pred)  # Remove predicted noise
                / alpha_prod_t.sqrt()  # Scale by alpha
            )

        return latents



    @torch.no_grad()
    def reconstruct_audio(self, latents):
        """
        Reconstructs audio waveform from latents.
        :param latents: Latent variables.
        :return: Reconstructed audio waveform.
        """
        mel_spectrogram = self.vae.decode_first_stage(latents)
        audio_waveform = self.vae.decode_to_waveform(mel_spectrogram)
        return audio_waveform

    def invert(self, audio_waveform, num_inference_steps=50):
        """
        Inverts an audio waveform through the diffusion pipeline.
        :param audio_waveform: Input waveform to invert.
        :param stft: STFT processor.
        :param num_inference_steps: Number of diffusion steps.
        :return: Reconstructed waveform and initial noise.
        """
        latents = self.get_latents_from_audio(audio_waveform)#.transpose(-3, -2)
        print("DBG", latents.shape)
        noise_latents = self.backward_diffusion(latents, num_inference_steps=num_inference_steps)
#         reconstructed_waveform = self.reconstruct_audio(latents)
        return noise_latents

# Example Usage:
# Initialize your model, scheduler, and STFT processor.
# Initialize your model, scheduler, and STFT processor.
# tango = Tango(name="declare-lab/tango")

import numpy as np
import librosa
import matplotlib.pyplot as plt
from scipy.spatial.distance import euclidean


def load_audio(file_path):
    """Load audio file using librosa."""
    audio, sr = librosa.load(file_path, sr=None)  # Load audio with native sampling rate
    return audio, sr

inversion_pipeline = AudioDiffusionInversion()

Fetching 9 files:   0%|          | 0/9 [00:00<?, ?it/s]

  fft_window = pad_center(fft_window, filter_length)
  mel_basis = librosa_mel_fn(


UNet initialized randomly.


Some weights of the model checkpoint at google/flan-t5-large were not used when initializing T5EncoderModel: ['decoder.block.11.layer.1.EncDecAttention.k.weight', 'decoder.block.9.layer.0.SelfAttention.k.weight', 'decoder.block.23.layer.0.layer_norm.weight', 'decoder.block.15.layer.0.SelfAttention.k.weight', 'decoder.block.19.layer.2.DenseReluDense.wo.weight', 'decoder.block.3.layer.2.DenseReluDense.wi_0.weight', 'decoder.block.5.layer.2.DenseReluDense.wi_1.weight', 'decoder.block.0.layer.2.DenseReluDense.wo.weight', 'decoder.block.1.layer.0.SelfAttention.o.weight', 'decoder.block.18.layer.0.SelfAttention.o.weight', 'decoder.block.16.layer.0.layer_norm.weight', 'decoder.block.11.layer.2.DenseReluDense.wo.weight', 'decoder.block.16.layer.2.layer_norm.weight', 'decoder.block.2.layer.1.EncDecAttention.q.weight', 'decoder.block.21.layer.0.SelfAttention.o.weight', 'decoder.block.17.layer.2.DenseReluDense.wo.weight', 'decoder.block.13.layer.1.EncDecAttention.o.weight', 'decoder.block.22.laye

Successfully loaded checkpoint from: declare-lab/tango


In [None]:
waveform, sr = load_audio('generated_samples/wm_p1_n1_c7.wav') 
waveform = torch.tensor(waveform)
noise = inversion_pipeline.invert(waveform, num_inference_steps=50)
