# **Baseline System**

**Install required packages:**

In [None]:
"""
DO NOT MODIFY THIS BLOCK.
"""

# Install packages for template code.
! pip install GitPython gdown==5.1.0
# Install packages for Baseline Model.
# If this cause a 'pydevd_plugins' error, simply RESTART the SESSION to solve the problem.
! pip install librosa==0.9.2 pytorch-lightning==2.1.1 transformers==4.30.2 einops==0.7.0 torchlibrosa==0.0.9 ftfy==6.1.1 braceexpand==0.1.7 webdataset==0.2.75 wget==3.2 timm==0.4.12 wandb taming-transformers-rom1504==0.0.6



In [None]:
"""
DO NOT MODIFY THIS BLOCK.
"""
from typing import List,Dict,Tuple
from numpy import ndarray

from abc import ABC, abstractmethod
from tqdm import tqdm
from IPython import display

**Mount Drive, set up path, define prompts, etc.**

In [None]:
"""
DO NOT MODIFY THIS BLOCK.
This block is used for every submission.
"""
import os
import tqdm
import soundfile as sf
from google.colab import drive

drive.mount('/content/gdrive')
#ROOT_PATH = "/content/gdrive/MyDrive/DCASE2024-T7"
ROOT_PATH = "/content/gdrive/MyDrive/ML_Project/baseline"


# text_prompts_list = ["a buzzer is ringing with water in the background",
#     "a pig is grunting with water in the background",
#     "an alarm of a car door stayin open is ringin with crowd in the background",
#     "a small dog is whining with water in the background",
#     "a car horn is honking with crowd in the background",
#     "a baby is laughing with crowd in the background",
#     "a burglar alarm is ringing with traffic in the background"] # Example text prompts from Dev. Set.

# function to read text_prompts_list from 'caption' column of a CSV file
def read_text_prompts_from_csv(filepath: str) -> List[str]:
    """Reads text prompts from a CSV file.

    Args:
        filepath (str): path to the CSV file.

    Returns:
        text_prompts (list of strings): List of text prompts.

    """
    import pandas as pd
    assert os.path.exists(filepath), f"File not found: {filepath}"
    df = pd.read_csv(filepath)
    return df['caption'].tolist()

#text_prompts_list = read_text_prompts_from_csv(os.path.join(ROOT_PATH, 'dataset/dev/caption.csv')) # organizers will update this
text_prompts_list = read_text_prompts_from_csv(os.path.join(ROOT_PATH, 'development_dataset/captions.csv')) # organizers will update this


SR = 32000  # audio sample-rate in Hz
duration = 4  # audio duratio in seconds
submission_idx = 0  # organizers will update this

save_folder = f'submission-{submission_idx:02d}'
print(save_folder)
os.makedirs(os.path.join(ROOT_PATH, save_folder), exist_ok=True)  # set to be False so that we won't overwrite.


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
submission-00


**Abstract class for sound synthesis model**

In [None]:
"""
DO NOT MODIFY THIS BLOCK.
YOU SHOULD SUBCLASS `SoundSynthesisModel` to wrap your model.
"""
class SoundSynthesisModel(ABC):
    @abstractmethod
    def synthesize_sounds(self, text_prompts: List[str]) -> Dict[str, ndarray]:
        """Synthesize sound examples that corresponds to the given text prompts respectively.

        Args:
            text_prompts (list of strings): text prompts in string enclosed in a list.

        Return:
            sound_samples (dict): A dictionary with text prompts as keys and the corresponding sound samples as values.
                Each value should be a 1-dim array (mono signal) with sample_rate=32000.
                If your model is not working at 32,000Hz, please add a resampling logic within this method.

        """
        pass

**Utility functions for pulling in the pretrained models and checkpoints required for baseline system.**

In [None]:
'''
DO NOT MODIFY THIS BLOCK.
'''

import os
import gdown
from git import Repo


def check_download_file_info(filename: str, shared_url: str, relative_dir: str, url_prefix: str) -> None:
  if not shared_url.startswith(url_prefix):
    raise ValueError(f"Invalid url: {shared_url}.\nMake sure the url is valid.\nIt should start with \'{url_prefix}\'.")
  if '/' in filename:
    raise ValueError(f"Invalid filename: {filename}.\nMake sure the filename does not start with \'/\'.")
  if relative_dir.startswith('/'):
    raise ValueError(f"Invalid relative_dir: {relative_dir}.\nMake sure the relative_dir is not an absolute path.")


def google_drive_download(filename: str, shared_url: str, relative_dir: str) -> None:
  check_download_file_info(filename, shared_url, relative_dir, 'https://drive.google.com')
  os.makedirs(os.path.join(baseline_dir, relative_dir), exist_ok=True)
  print(f'Downloading \'{filename}\' from gdrive to {os.path.join(baseline_dir, relative_dir, filename)}')
  gdown.download(url=shared_url, output=os.path.join(baseline_dir, relative_dir, filename),
                 quiet=False, fuzzy=True)


def wget_download(filename: str, shared_url: str, relative_dir: str) -> None:
  check_download_file_info(filename, shared_url, relative_dir, 'https://')
  os.makedirs(os.path.join(baseline_dir, relative_dir), exist_ok=True)

  import subprocess
  from IPython.display import display, clear_output
  import time

  command = ['wget', shared_url, '-O', os.path.join(baseline_dir, relative_dir, filename), '-v']

  process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)

  while True:
    output = process.stdout.readline()
    if output == '' and process.poll() is not None:
        break
    if output:
        # Clear the previous output
        clear_output(wait=True)
        # Display the new output
        display(output.strip())
    time.sleep(0.1)  # Add a slight delay to reduce flickering

  rc = process.poll()
  if rc == 0:
      print(f"Download completed successfully (filename: {filename}).")
  else:
      print(f"Download failed with return code {rc} (filename: {filename}).")



def git_clone_checkout(output_dir: str, url: str, branch: str, commit_sha: str) -> None:
  if not url.startswith('https://'):
    raise ValueError(f"Invalid url: {url}.\nMake sure the url is valid.\nIt should start with \'https://\'")
  # Clone the repository (This will clone the default branch)
  os.makedirs(os.path.join(baseline_dir, output_dir), exist_ok=True)
  repo = Repo.clone_from(url, os.path.join(baseline_dir, output_dir))
  # Checkout the specific branch
  repo.git.checkout(branch)
  # Checkout the specific commit
  repo.git.checkout(commit_sha)


def unpack_file(file_path: str, output_dir: str) -> None:
  import shutil

  if not os.path.exists(os.path.join(baseline_dir, file_path)):
    raise ValueError(f"File not found: {os.path.join(baseline_dir, file_path)}")
  file_format = '.'.join(os.path.basename(file_path).split('.')[1:])
  print(f"Unpacking file {os.path.basename(file_path)}...")

  if file_format == 'tar':
    shutil.unpack_archive(os.path.join(baseline_dir, file_path), os.path.join(baseline_dir, output_dir), format='tar')
  elif file_format == 'tar.gz':
    shutil.unpack_archive(os.path.join(baseline_dir, file_path), os.path.join(baseline_dir, output_dir), format='gztar')
  elif file_format == 'tar.xz':
    shutil.unpack_archive(os.path.join(baseline_dir, file_path), os.path.join(baseline_dir, output_dir), format='xztar')
  elif file_format == 'zip':
    shutil.unpack_archive(os.path.join(baseline_dir, file_path), os.path.join(baseline_dir, output_dir), format='zip')
  else:
    raise ValueError(f'Format {file_format} is not supported. Use .tar, .tar.gz, .tar.xz, or .zip format.')

**Prepare a folder where the implementation of the baseline model will be stored:**

In [None]:
import os
baseline_dir:str = f'{ROOT_PATH}/baseline_implementation'
os.makedirs(baseline_dir,exist_ok=True)

**Clone the baseline code repository and download checkpoints (pretrained models and audio synthesis components):**

In [None]:
'''
DO NOT MODIFY THIS BLOCK.
'''

git_clone_checkout('./AudioLDM-training-finetuning', 'https://github.com/DCASE2024-Task7-Sound-Scene-Synthesis/AudioLDM-training-finetuning.git',
                   'main', 'a6b15e86c3d042832dee08a94beb11819b297e39')

download_files_google_list = [
    # ('checkpoints.tar', 'https://drive.google.com/file/d/1T6EnuAHIc8ioeZ9kB1OZ_WGgwXAVGOZS/view?usp=sharing',
    #  'AudioLDM-training-finetuning/data')
]

download_files_wget_list = [
    ('checkpoints.tar', 'https://www.dropbox.com/scl/fi/he6rqr24y1pc3s94lm8tc/checkpoints.tar?rlkey=5yu046f5uvdijq8eor77ej4fx&dl=0',
     'AudioLDM-training-finetuning/data'),
    ('audioldm-m-full.ckpt', 'https://zenodo.org/records/7884686/files/audioldm-m-full.ckpt',
     'AudioLDM-training-finetuning/data/checkpoints')
]

# for filename, shared_url, relative_dir in download_files_google_list:
#   google_drive_download(filename, shared_url, relative_dir)
for filename, shared_url, relative_dir in download_files_wget_list:
  wget_download(filename, shared_url, relative_dir)

unpack_file('AudioLDM-training-finetuning/data/checkpoints.tar', 'AudioLDM-training-finetuning/data')

''

Download completed successfully (filename: audioldm-m-full.ckpt).
Unpacking file checkpoints.tar...


**Define the baseline model class:**

In [None]:
'''
DO NOT MODIFY THIS BLOCK.
'''

import yaml
import torch
import importlib
import librosa
import sys
from torch.utils.data import DataLoader
from pytorch_lightning import seed_everything

sys.path.append(os.path.join(baseline_dir, 'AudioLDM-training-finetuning'))
from audioldm_train.utilities.data.dataset import AudioDataset
from audioldm_train.utilities.model_util import instantiate_from_config


def get_input_with_key(batch, k):
    fname, text, label_indices, waveform, stft, fbank = (
        batch["fname"],
        batch["text"],
        batch["label_vector"],
        batch["waveform"],
        batch["stft"],
        batch["log_mel_spec"],
    )

    ret = {}

    ret["fbank"] = (
        fbank.unsqueeze(1).to(memory_format=torch.contiguous_format).float()
    )
    ret["stft"] = stft.to(memory_format=torch.contiguous_format).float()
    ret["waveform"] = waveform.to(memory_format=torch.contiguous_format).float()
    ret["text"] = list(text)
    ret["fname"] = fname

    for key in batch.keys():
        if key not in ret.keys():
            ret[key] = batch[key]

    return ret[k]

def find_loudest_segment(audio: np.ndarray, sr: int, segment_length: int = 4, hop_length_sec: float = 2.0) -> np.ndarray:
    """
    Find the loudest segment in an audio waveform using Librosa's framing and RMS features.

    Parameters:
    - audio (np.ndarray): The audio waveform as a NumPy ndarray.
    - sr (int): The sampling rate of the audio waveform in Hz.
    - segment_length (int): The length of the segment to find in whole seconds.
    - hop_length_sec (float): The hop length for segment calculation in seconds.

    Returns:
    - np.ndarray: The loudest segment of the audio waveform.
    """
    hop_length_samples = int(sr * hop_length_sec)
    frame_length_samples = int(sr * segment_length)

    rms_values = librosa.feature.rms(y=audio, frame_length=frame_length_samples, hop_length=hop_length_samples, center=False)

    max_rms_index = np.argmax(rms_values)

    start_sample = max_rms_index * hop_length_samples
    end_sample = start_sample + frame_length_samples

    loudest_segment = audio[start_sample:end_sample]

    return loudest_segment


class BaseLineModel(SoundSynthesisModel):
    def __init__(self) -> None:
        super().__init__()

        self.sr: int = 32000 # sampling rate
        self.duration: int = 4 # audio length in seconds
        self.batch_size: int = 8 # batch size in int
        self.loudest_hop_len: float = 2.0 # hop size for function 'find_loudest_segment' in seconds

        config_yaml_path = os.path.join(repo_dir,
                                        'audioldm_train/config/2023_08_23_reproduce_audioldm/audioldm_original_medium.yaml')
        reload_from_ckpt = os.path.join(repo_dir, 'data/checkpoints/audioldm-m-full.ckpt')
        self.configs = yaml.load(open(config_yaml_path, "r"), Loader=yaml.FullLoader)
        self.configs["reload_from_ckpt"] = reload_from_ckpt
        clap_ckpt_path = self.configs["model"]["params"]["cond_stage_config"]["film_clap_cond1"]["params"]["pretrained_path"]
        self.configs["model"]["params"]["cond_stage_config"]["film_clap_cond1"]["params"]["pretrained_path"] = os.path.join(
            baseline_dir,'AudioLDM-training-finetuning', clap_ckpt_path)

        if "seed" in self.configs.keys():
            seed_everything(self.configs["seed"])
        else:
            print("SEED EVERYTHING TO 0")
            seed_everything(0)
        if "precision" in self.configs.keys():
            torch.set_float32_matmul_precision(self.configs["precision"])

        self.latent_diffusion = instantiate_from_config(self.configs["model"])
        checkpoint = torch.load(self.configs["reload_from_ckpt"])
        self.latent_diffusion.load_state_dict(checkpoint["state_dict"], strict=False)

    @torch.no_grad()
    def synthesize_sounds(self, text_prompts: List[str]) -> Dict[str, ndarray]:
        audio_list:List[ndarray] = list()

        dataset_json = {"data": [{'wav': '', 'caption': caption} for caption in text_prompts]}

        if "dataloader_add_ons" in self.configs["data"].keys():
            dataloader_add_ons = self.configs["data"]["dataloader_add_ons"]
        else:
            dataloader_add_ons = []

        val_dataset = AudioDataset(
            self.configs, split="test", add_ons=dataloader_add_ons, dataset_json=dataset_json
        )
        val_loader = DataLoader(
            val_dataset,
            batch_size=self.batch_size,
        )

        guidance_scale = self.configs["model"]["params"]["evaluation_params"][
            "unconditional_guidance_scale"
        ]
        ddim_sampling_steps = self.configs["model"]["params"]["evaluation_params"][
            "ddim_sampling_steps"
        ]
        n_candidates_per_samples = self.configs["model"]["params"]["evaluation_params"][
            "n_candidates_per_samples"
        ]

        self.latent_diffusion.eval()
        self.latent_diffusion = self.latent_diffusion.cuda()

        waveforms_dict = self.generate_sample(
            val_loader,
            unconditional_guidance_scale=guidance_scale,
            ddim_steps=ddim_sampling_steps,
            n_gen=n_candidates_per_samples,
            sampling_rate=self.configs["variables"]["sampling_rate"],
        )

        return_audio_dict = {}
        for text_prompt, waveform in waveforms_dict.items():
          # resample the audio if the model doesn't output 32,000Hz waveform
          if not self.configs['variables']['sampling_rate'] == self.sr:
            waveform = librosa.resample(waveform, orig_sr=self.configs['variables']['sampling_rate'],
                                        target_sr=self.sr)
          # pad or chop the audio if the model doesn't output 4-second audio
          if len(waveform) < self.sr * self.duration:
              waveform = np.pad(waveform, (0, (self.sr * self.duration)-len(waveform)), 'constant', constant_values=0)
          elif len(waveform) > self.sr * self.duration:
              waveform = find_loudest_segment(waveform, sr=self.sr,
                                              segment_length=self.duration, hop_length_sec=self.loudest_hop_len)
          return_audio_dict[text_prompt] = waveform

        assert len(return_audio_dict) == len(text_prompts), f"return_dict {len(return_audio_dict)} prompts {len(text_prompts)}"

        return return_audio_dict

    @torch.no_grad()
    def generate_sample(
        self,
        batchs,
        ddim_steps=200,
        ddim_eta=1.0,
        x_T=None,
        n_gen=1,
        unconditional_guidance_scale=1.0,
        unconditional_conditioning=None,
        use_plms=False,
        **kwargs,
    ) -> Dict[str, np.ndarray]:
        # Generate n_gen times and select the best
        # Batch: audio, text, fnames
        assert x_T is None
        try:
            batchs = iter(batchs)
        except TypeError:
            raise ValueError("The first input argument should be an iterable object")

        if use_plms:
            assert ddim_steps is not None

        use_ddim = ddim_steps is not None

        model = self.latent_diffusion
        waveforms = {}

        with model.ema_scope("Plotting"):
            for i, batch in enumerate(batchs):
                z, c = model.get_input(
                    batch,
                    model.first_stage_key,
                    unconditional_prob_cfg=0.0,
                )

                c = model.filter_useful_cond_dict(c)

                text = get_input_with_key(batch, "text")

                # Generate multiple samples
                batch_size = z.shape[0] * n_gen

                # Generate multiple samples at a time and filter out the best
                # The condition to the diffusion wrapper can have many format
                for cond_key in c.keys():
                    if isinstance(c[cond_key], list):
                        for i in range(len(c[cond_key])):
                            c[cond_key][i] = torch.cat([c[cond_key][i]] * n_gen, dim=0)
                    elif isinstance(c[cond_key], dict):
                        for k in c[cond_key].keys():
                            c[cond_key][k] = torch.cat([c[cond_key][k]] * n_gen, dim=0)
                    else:
                        c[cond_key] = torch.cat([c[cond_key]] * n_gen, dim=0)

                text = text * n_gen

                if unconditional_guidance_scale != 1.0:
                    unconditional_conditioning = {}
                    for key in model.cond_stage_model_metadata:
                        model_idx = model.cond_stage_model_metadata[key]["model_idx"]
                        unconditional_conditioning[key] = model.cond_stage_models[
                            model_idx
                        ].get_unconditional_condition(batch_size)

                fnames = list(get_input_with_key(batch, "fname"))

                samples, _ = model.sample_log(
                    cond=c,
                    batch_size=batch_size,
                    x_T=x_T,
                    ddim=use_ddim,
                    ddim_steps=ddim_steps,
                    eta=ddim_eta,
                    unconditional_guidance_scale=unconditional_guidance_scale,
                    unconditional_conditioning=unconditional_conditioning,
                    use_plms=use_plms,
                )

                mel = model.decode_first_stage(samples)

                waveform = model.mel_spectrogram_to_waveform(
                    mel, bs=None, name=fnames, save=False
                )

                if n_gen > 1:
                    try:
                        best_index = []
                        similarity = model.clap.cos_similarity(
                            torch.FloatTensor(waveform).squeeze(1), text
                        )
                        for i in range(z.shape[0]):
                            candidates = similarity[i :: z.shape[0]]
                            max_index = torch.argmax(candidates).item()
                            best_index.append(i + max_index * z.shape[0])

                        waveform = waveform[best_index]

                    except Exception as e:
                        print("Warning: while calculating CLAP score (not fatal), ", e)

                text = text[:len(text)//n_gen]
                assert len(text) == waveform.shape[0], f'{len(text)}, {waveform.shape}'
                for idx, text_prompt in enumerate(text):
                  assert not waveforms.get(text_prompt, False)
                  waveforms[text_prompt] = np.squeeze(waveform[idx], axis=0)

        return waveforms


# DO NOT change the working directory in your own code.
# This is only for demonstration purpose.
repo_dir = os.path.join(baseline_dir, 'AudioLDM-training-finetuning')
os.chdir(repo_dir)

**Exectute the pipeline for generating and validating audio clips using the baseline model:**

In [None]:
import time
import IPython.display as ipd
import numpy as np


def check_srcs_dict(srcs_dict: Dict[str, np.ndarray], text_prompts_list: List[str], duration: int, SR: int) -> None:
    """ Check if the return value of synthesize_sounds method is valid.

    Args:
        srcs_dict (dict): the return value of synthesize_sounds method.
        text_prompts_list (list of strings): list of text prompts.
        duration (int): duration of the audio in seconds.
        SR (int): sample rate of the audio.

    Returns:
        None
    """
    assert isinstance(srcs_dict, dict), "The return value of synthesize_sounds method should be a dictionary."
    assert all(isinstance(k, str) for k in srcs_dict.keys()), "The keys of dictionary, the return value of \'synthesize_sounds\' method, should be strings (corresponding text prompt)."
    assert all(isinstance(v, np.ndarray) for v in srcs_dict.values()), "The values of dictionary, the return value of \'synthesize_sounds\' method, should be numpy arrays (audio waveform)."
    assert list(srcs_dict.keys()) == text_prompts_list, "The keys of dictionary, the return value of \'synthesize_sounds\' method, should match the input text prompts."
    for _, src in srcs_dict.items():
        assert src.ndim == 1, "The audio waveform should be mono."
        assert len(src) == int(duration * SR), "The audio waveform should be 4 seconds long."


start_time = time.time() # measure total inference time

fss_model = BaseLineModel()

srcs_dict = fss_model.synthesize_sounds(text_prompts_list)
check_srcs_dict(srcs_dict, text_prompts_list, duration, SR)

os.makedirs(os.path.join(ROOT_PATH, baseline_dir, 'output'), exist_ok=True)
for src_text, src in tqdm.tqdm(srcs_dict.items()):
    _filepath = os.path.join(ROOT_PATH, baseline_dir, 'output', f"{src_text}.wav")
    src = src / np.max(np.abs(src)) # normalize the energy of the generation output
    sf.write(_filepath, src, SR, subtype='PCM_16')

inference_time = time.time() - start_time
print("Total inference time: ", inference_time)

print('Listen to the generated sound...')
print(f'- prompt: {text_prompts_list[0]}')
ipd.Audio(srcs_dict[text_prompts_list[0]], rate=SR) # listen to the generated result