# Diffusion Models for Audio Semantic Communication

### Prerequisites

Run once to set all the needed stuff

In [None]:
!pip install librosa==0.9.2
!pip install huggingface_hub==0.13.3
!pip install einops==0.6.1
!pip install transformers==4.27.0
!pip install progressbar
!pip install pandas
!pip install matplotlib
!pip install torchmetrics
!pip install frechet-audio-distance

# Install PyTorch version 1.13.1 with CUDA 11.7 support
!pip install torch==1.13.1+cu117 torchvision==0.14.1+cu117 torchaudio==0.13.1 --extra-index-url https://download.pytorch.org/whl/cu117

%cd diffusers/
!pip install -e .
    
!git clone https://github.com/declare-lab/tango
%cd tango
%cd 'diffusers'
!pip install -e .
%cd ../tango/
%mkdir weights
%cd weights
!git clone https://huggingface.co/declare-lab/tango-full-ft-audiocaps

## Initialize tango

In [1]:
%cd tango
import os

import json
import torch
from tqdm import tqdm
from huggingface_hub import snapshot_download
from models import AudioDiffusion, DDPMScheduler
from audioldm.audio.stft import TacotronSTFT
from audioldm.variational_autoencoder import AutoencoderKL
import IPython
import soundfile as sf

/mnt/media/christian/DiffInpainting/tango


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]
  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class Tango:
    def __init__(self, path_to_weights='', device="cuda:0"):

        if path_to_weights=='':
            path_to_weights = './weights/tango-full-ft-audiocaps'

        vae_config = json.load(open("{}/vae_config.json".format(path_to_weights)))
        stft_config = json.load(open("{}/stft_config.json".format(path_to_weights)))
        main_config = json.load(open("{}/main_config.json".format(path_to_weights)))

        self.vae = AutoencoderKL(**vae_config).to(device)
        self.stft = TacotronSTFT(**stft_config).to(device)
        self.model = AudioDiffusion(**main_config).to(device)

        vae_weights = torch.load("{}/pytorch_model_vae.bin".format(path_to_weights), map_location=device)
        stft_weights = torch.load("{}/pytorch_model_stft.bin".format(path_to_weights), map_location=device)
        main_weights = torch.load("{}/pytorch_model_main.bin".format(path_to_weights), map_location=device)

        self.vae.load_state_dict(vae_weights)
        self.stft.load_state_dict(stft_weights)
        self.model.load_state_dict(main_weights)

        print ("Successfully loaded checkpoint from:", path_to_weights)

        self.vae.eval()
        self.stft.eval()
        self.model.eval()

        self.scheduler = DDPMScheduler.from_pretrained(main_config["scheduler_name"], subfolder="scheduler")

    def chunks(self, lst, n):
        """ Yield successive n-sized chunks from a list. """
        for i in range(0, len(lst), n):
            yield lst[i:i + n]

    def generate(self, prompt, steps=100, guidance=3, samples=1, disable_progress=True):
        """ Generate audio for a single prompt string. """
        with torch.no_grad():
            latents = self.model.inference([prompt], self.scheduler, steps, guidance, samples, disable_progress=disable_progress)
            mel = self.vae.decode_first_stage(latents)
            wave = self.vae.decode_to_waveform(mel)
        return wave[0]

    def generate_for_batch(self, prompts, steps=100, guidance=3, samples=1, batch_size=8, disable_progress=True):
        """ Generate audio for a list of prompt strings. """
        outputs = []
        for k in tqdm(range(0, len(prompts), batch_size)):
            batch = prompts[k: k+batch_size]
            with torch.no_grad():
                latents = self.model.inference(batch, self.scheduler, steps, guidance, samples, disable_progress=disable_progress)
                mel = self.vae.decode_first_stage(latents)
                wave = self.vae.decode_to_waveform(mel)
                outputs += [item for item in wave]
        if samples == 1:
            return outputs
        else:
            return list(self.chunks(outputs, samples))

In [3]:
tango = Tango()

  fft_window = pad_center(fft_window, filter_length)
  mel_basis = librosa_mel_fn(


UNet initialized randomly.


Some weights of the model checkpoint at google/flan-t5-large were not used when initializing T5EncoderModel: ['decoder.block.9.layer.1.layer_norm.weight', 'decoder.block.20.layer.1.EncDecAttention.k.weight', 'decoder.block.0.layer.2.DenseReluDense.wo.weight', 'decoder.block.21.layer.0.SelfAttention.q.weight', 'decoder.block.5.layer.0.SelfAttention.q.weight', 'decoder.block.11.layer.1.layer_norm.weight', 'decoder.block.14.layer.2.DenseReluDense.wo.weight', 'decoder.block.18.layer.1.EncDecAttention.q.weight', 'decoder.block.23.layer.2.DenseReluDense.wo.weight', 'decoder.block.7.layer.0.SelfAttention.o.weight', 'decoder.block.15.layer.1.layer_norm.weight', 'decoder.block.22.layer.0.SelfAttention.k.weight', 'decoder.block.15.layer.1.EncDecAttention.v.weight', 'decoder.block.8.layer.1.EncDecAttention.q.weight', 'decoder.block.15.layer.1.EncDecAttention.q.weight', 'decoder.block.21.layer.2.layer_norm.weight', 'decoder.block.2.layer.0.SelfAttention.q.weight', 'decoder.block.1.layer.1.layer_no

Successfully loaded checkpoint from: ./weights/tango-full-ft-audiocaps


## DNNM+ & Repaint

Change the following parameters to test DDNM+ on the denoising or impainting task.

You should first download the AudioCaps dataset and change the paths below accordingly. Please note that different seeds can impact the quality of final results.

In [4]:
%cd ../versions

/mnt/media/christian/DiffInpainting/versions


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


In [5]:
model = 'ddnm+'                         # 'ddnm+' or 'repaint'
snr = 20 # 17.5, 20, 30                 # Specify the PSNR level of the noisy audio
noisy_prompts=True                      # Whether to apply noise to text prompts or not
prompt_psnr = snr                       # Here you can control the amount of noise to apply to the prompt embedding
time_mask_percentage = (0.45, 0.55)     # IMPAINTING? If not, set (0.,0.)
mask_type = 'time'                      # type of data you want to apply the mask to
print_report = False                    # True to print final report
num_samples = 1                         # Number of audio files with which to test the method
save_output_audio = True                # Save the output audio file
audio_description = 'ddnmp_snr20'       # filename of the saved files
save_noisy_audio = False                # Whether to save noise latents converted to audio or not
caption_filter = None                   # Search for specific words in the captions (list of words)

name_experiment = 'DDNM+ inpaint SNR20 test'
output_path = '../output'
dataset_path = '../AudioCaps'
dataset_subset = 'AudioCaps_Val'

In [6]:
import pandas as pd
import shutil
import utils


output_path_generated = os.path.join(output_path, f'{name_experiment}/generated')
output_path_noisy  = os.path.join(output_path, f'{name_experiment}/noisy')
output_path_clean = os.path.join(output_path, f'{name_experiment}/clean')
output_path_spectrograms = os.path.join(output_path, f'{name_experiment}/spectrograms')

usecols = ["youtube_id", "start_time", "caption"]
file_list = pd.read_csv(os.path.join(dataset_path, "val.csv"), index_col="youtube_id", usecols=usecols)

d_audios = utils.get_d_audios(file_list, num_to_select=num_samples, dataset_path = os.path.join(dataset_path, dataset_subset), caption_filter=caption_filter, max_f=200)

if not os.path.exists(os.path.join(output_path, name_experiment)):
    os.mkdir(os.path.join(output_path, name_experiment))
    os.mkdir(output_path_generated)
    os.mkdir(output_path_noisy)
    os.mkdir(output_path_clean)
    os.mkdir(output_path_spectrograms)

inference_with_mask = utils.get_version(model)

for i in range(0, num_samples):
  original_audio_paths = [os.path.join(dataset_path, dataset_subset, f"{d_audios[i]['Index']}_{d_audios[i]['start_time']}.wav")]
  caption = d_audios[i]['caption']
  shutil.copyfile(original_audio_paths[0], os.path.join(output_path_clean, f"{i}.wav"))
  
  # Get the embeddings of the original audio
  _, original_latents = utils.get_original_latents(original_audio_paths, tango)

  # Apply noise to the latents
  noisy_latents = utils.apply_noise(original_latents, snr, verbose=False)

  # Get the mask (time or mel) to apply to the latents (needed for inpainting)
  if sum(time_mask_percentage)==0:
      ipainting = True
  mask = utils.get_mask(time_mask_percentage, mask_type=mask_type)

  # Set the sigma_y value
  sigma_y = utils.get_sigma_y(noisy_latents, mask)

  with torch.no_grad():
      
      if model == 'ddnm+':
          latents = inference_with_mask(tango.model, [caption], tango.scheduler,
                                        num_steps=1000, guidance_scale=3, num_samples_per_prompt=1, disable_progress=False,
                                        mask=mask, original= original_latents if mask is None else noisy_latents, 
                                        sigma_y=sigma_y, travel_length=0,
                                        noisy_prompts=noisy_prompts, psnr_prompts=prompt_psnr)
      elif model == 'repaint' and mask is not None:
          latents = inference_with_mask(tango.model, [caption], tango.scheduler,
                                  t_T=1000, guidance_scale=3, num_samples_per_prompt=1, disable_progress=False,
                                  mask=mask, original=original_latents)
      else:
          print("Please select a proper method for the desired task")


      output_mels = tango.vae.decode_first_stage(latents)
      waves = tango.vae.decode_to_waveform(output_mels)

      utils.save_audio(output_path_generated, waves, id_file=f'{i}', descr=audio_description)

      original_mels = tango.vae.decode_first_stage(original_latents)
      
          
      noisy_mels = tango.vae.decode_first_stage(noisy_latents)
      noisy_waves = tango.vae.decode_to_waveform(noisy_mels)
          
      if save_noisy_audio:
          utils.save_audio(output_path_noisy, noisy_waves, id_file=f'{i}', descr=audio_description)
      

  mel_mask = utils.get_mask(time_mask_percentage, mask_type='mel')
  utils.plot_spectrogram(original_mels, noisy_mels, output_mels, mel_mask=mel_mask, save_fig=True, id_s=i, output_path=output_path_spectrograms)


DDNM+


100%|██████████████████████████████████████████████████| 1000/1000 [07:13<00:00,  2.31it/s]


<Figure size 1200x300 with 0 Axes>

## Evaluate

In [8]:
import utils

folders = ['generated', 'clean']

# specify set of audios to exclude from the metrics computation
exclude_audios = []

snr_all, sdr_all, fad = utils.compute_metrics(os.path.join(output_path, name_experiment, folders[0]), 
                                              os.path.join(output_path, name_experiment, folders[1]), exclude=exclude_audios)
utils.print_metrics_report(snr_all, sdr_all, fad)

Using cache found in /home/christian/.cache/torch/hub/harritaylor_torchvggish_master


[1m             Results [0m 
         SNR        SDR      
mean   -2.2181   -10.5631  
std     0.0895     0.4068    
min    -2.3077   -10.9699  
max    -2.1286    -10.1563   

FAD          4.2125               
