# Spectrogrand: Text to AudioVisual Pipeline

## Install dependencies

In [None]:
!pip install --upgrade -r /kaggle/input/spectrogrand-public-release/kaggle-public-release/REQUIREMENTS.txt

## Create appropriate directories for outputs

In [None]:
import time

current_timestamp = str(int(time.time()))
BASE_DIR = f"./logs_{current_timestamp}"

In [None]:
!mkdir -p {BASE_DIR}

In [None]:
AUDIO_STREAM_DIR = f"{BASE_DIR}/saved_audio_stream"
IMAGE_PARENT_DIR_SPEC = f"{BASE_DIR}/saved_image_parent_spec"
IMAGE_STREAM_DIR_SPEC = f"{BASE_DIR}/saved_image_stream_spec"
IMAGE_STREAM_DIR_SD = f"{BASE_DIR}/saved_image_stream_sd"
IMAGE_STREAM_DIR_NST_1 = f"{BASE_DIR}/saved_image_stream_nst_1"
IMAGE_STREAM_DIR_NST_2 = f"{BASE_DIR}/saved_image_stream_nst_2"

!mkdir -p {AUDIO_STREAM_DIR}
!mkdir -p {IMAGE_PARENT_DIR_SPEC}
!mkdir -p {IMAGE_STREAM_DIR_SPEC}
!mkdir -p {IMAGE_STREAM_DIR_SD}
!mkdir -p {IMAGE_STREAM_DIR_NST_1}
!mkdir -p {IMAGE_STREAM_DIR_NST_2}

## Initialise the models and helper functions

In [None]:
"""
    Collection of helper functions pertaining to the audio domain of spectrogrand
"""
from diffusers import AudioLDM2Pipeline
from typing import Optional, List
import scipy
import numpy as np
import librosa
import pickle

import torch
torch.random.manual_seed(42)
DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"

from transformers import ClapModel, ClapProcessor


# Load the AudioLDM pipeline
audio_ldm_pipeline = AudioLDM2Pipeline.from_pretrained("cvssp/audioldm2-music")
audio_ldm_pipeline.to(DEVICE)

# Load the CLAP pipeline
clap_model = ClapModel.from_pretrained("laion/clap-htsat-fused")
clap_model.to(DEVICE)
clap_processor = ClapProcessor.from_pretrained("laion/clap-htsat-fused")

"""
    @method create_and_save_audio_file
        Use the `audioldm2-music` model to generate synthetic audio conditioned on inputs
    @param text_prompt: Descriptor of the audio to be generated (@note The user is requested to be as verbose as possible)
    @param output_file_path: Path to which the generated audio is to saved
    @param num_inference_steps: Number of inference steps for the audioldm2-music model (@note The higher this value, the longer the po)
    @param audio_length: Length of the audio piece (in s) (default: 10.0)
    @param negative_prompt: Negative prompt to be passed to the audioldm2-music model (default: 'low quality, monotonous, boring')
"""
def create_and_save_audio_file(text_prompt:str, output_file_path:str, num_inference_steps:int=500, audio_length:float=10.0, negative_prompt = "low quality, monotonous, boring") -> Optional[str]:
    try:
        global audio_ldm_pipeline
        # Generate audio
        audio = audio_ldm_pipeline(
            text_prompt,
            negative_prompt=negative_prompt,
            num_inference_steps=num_inference_steps,
            audio_length_in_s=float(audio_length),
            num_waveforms_per_prompt=2
        ).audios[0]
        # Save audio
        scipy.io.wavfile.write(output_file_path, rate=16000, data=audio)
        return output_file_path
    except Exception as e:
        print(f"Error while generating and saving audio: {e}")
        return None

"""
    @method load_wav_file
        Load a wav file from a given path
    @param input_file_path: Path containing the input audio file
"""
def load_wav_file(input_file_path:str):
    try:
        sr, data = scipy.io.wavfile.read(input_file_path)
        return sr, data
    except Exception as e:
        print(f"Error while reading {input_file_path}: {e}")
        return None, None
    
"""
    @method resample_audio_data
        Resample audio data to a target sampling rate
    @param origin_data: Numpy array containing the audio data (@note Intended inputs stem from the `load_wav_file` method)
    @param origin_sampling_rate: Sampling rate of the input audio (@note Intended inputs stem from the `load_wav_file` method)
    @param new_sampling_rate: Desired sampling rate (default: 48000)
"""
def resample_audio_data(origin_data:np.ndarray, origin_sampling_rate:int, new_sampling_rate:int=48000) -> Optional[np.ndarray]:
    try:
        origin_type = origin_data.dtype
        resampled_data = librosa.resample(origin_data.T.astype('float'), orig_sr = origin_sampling_rate, target_sr = new_sampling_rate) 
        resampled_data = librosa.to_mono(resampled_data)        
        resampled_data = resampled_data.T.astype(origin_type)
        data_np = np.array(resampled_data)
        return data_np
    except Exception as e:
        print(f"Error while resampling audio data: {e}")
        return None

"""
    @method compute_clap_embeddings
        Compute CLAP embeddings for an input audio file
    @input input_file_path: Path to the input audio file
"""
def compute_clap_embeddings(input_file_path:str) -> Optional[torch.Tensor]:
    try:
        global clap_processor, clap_model, DEVICE
        # Load audio and resample to 48000 Hz
        sr, origin_data = load_wav_file(input_file_path=input_file_path)
        origin_data_resampled = resample_audio_data(origin_data=origin_data, origin_sampling_rate=sr, new_sampling_rate=48000)
        # Get CLAP outputs
        clap_inputs = clap_processor(audios=origin_data_resampled, sampling_rate=48000, return_tensors="pt").to(DEVICE)
        clap_outputs = clap_model.get_audio_features(**clap_inputs)
        audio_embeds = clap_outputs[0].detach().cpu()
        return audio_embeds
    except Exception as e:
        print(f"Error while computing CLAP embeddings for {input_file_path}: {e}")
        return None
    
"""
    @method compute_clap_similarity
        Compute CLAP similarity for an input audio file with respect to a saved ground truth mapping of embeddings
        @input input_file_path: Path to the input audio file
        @input ground_truth_dict_path: Path to the mapping .pkl file 
        @note The ground truth mapping should be a .pkl file with the following schema:
            {
                "genre_name" : [list_of_clap_embeddings],
                ...
            }
        @input filter_genre: Genre name to compute from. If values are to be aggregated across the entire search space, this value should be left as `None`. (default: None)
"""
def compute_clap_similarity(input_file_path:str, ground_truth_dict_path:str, filter_genre:Optional[str]=None) -> Optional[float]:
    try:
        # Load the embeddings from the ground truth mapping and set the search space
        with open(ground_truth_dict_path, "rb") as f:
            data = pickle.load(f)
        input_search_space_embeds = []
        if filter_genre is not None:
            # Convert `filter_genre` into underscore format if required
            if "_" not in filter_genre: # eg: 'bass house'
                filter_genre = filter_genre.replace(" ","_")
            input_search_space_embeds = data[filter_genre]
        else:
            for _k in data:
                input_search_space_embeds.extend(data[_k])
        assert len(input_search_space_embeds) >= 1

        # Compute CLAP embeddings for the input file
        source_embed = compute_clap_embeddings(input_file_path=input_file_path)

        # Keep track of running dot product scores
        running_score = 0.0
        for target_embed in input_search_space_embeds:
            z = source_embed@target_embed.T
            running_score += float(z.detach().cpu())

        # Return the average dot product score
        return (running_score)/float(len(input_search_space_embeds))
    except Exception as e:
        print(f"Error while computing CLAP similarity score for {input_file_path}: {e}")
        return None
    
"""
    @method load_wav_chunk
        Load a time-defined chunk of audio from a wav file
    @param input_file_path: Path to the input wav file
    @param chunk_offset: Timestamp (in s) from which the chunk starts
    @param chunk_duration: Duration of the returned chunk (in s) (default: 0.1)
"""
def load_wav_chunk(input_file_path:str, chunk_offset:float, chunk_duration:float=0.1):
    try:
        y, sr = librosa.load(path=input_file_path, offset=float(chunk_offset), duration=float(chunk_duration), sr=None)
        return sr, y
    except Exception as e:
        print(f"Error while loading chunk from {input_file_path}: {e}")
        return None, None
    
"""
    @method create_and_save_audio_file_stream
        Use the `audioldm2-music` model to generate synthetic audio conditioned on inputs
    @param topic: Topic of the audio files to be generated
    @param output_dir: Directory to which the generated audio files are to saved
    @param num_inference_steps_list: List containing the number of inference steps (default: [500, 750])
    @param audio_length: Length of the audio piece (in s) (default: 10.0)
    @param negative_prompt: Negative prompt to be passed to the audioldm2-music model (default: 'low quality, monotonous, boring')
"""
def create_and_save_audio_file_stream(topic:str, output_dir:str, num_inference_steps_list:list=[500, 750], audio_length:float=10.0, negative_prompt = "low quality, monotonous, boring") -> Optional[List[str]]:
    try:
        global audio_ldm_pipeline
        # Generate audio
        # Keep running count of the current time index and number of images generated
        num_audios_generated = 0
        saved_output_file_names = []
        
        # Construct the text prompt
        text_prompt = f"Jumpy electronic house music for {topic}"

        for num_infer in num_inference_steps_list:
            output_file = f"{output_dir}/audio{num_audios_generated}.wav"
            output_file = create_and_save_audio_file(text_prompt=text_prompt, output_file_path=output_file, num_inference_steps=num_infer, audio_length=audio_length, negative_prompt=negative_prompt)
            if output_file is not None:
                saved_output_file_names.append(output_file)
                num_audios_generated += 1

        return saved_output_file_names
    except Exception as e:
        print(f"Error while generating and saving audio stream: {e}")
        return None

In [None]:
"""
    Collection of helper functions pertaining to the spectrogram domain of spectrogrand
"""
from typing import Optional, List
from PIL import Image
from io import BytesIO
import numpy as np
from glob import glob
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torchvision import transforms

torch.random.manual_seed(42)

import tensorflow as tf
import tensorflow_hub as hub


TORCH_DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"
TF_DEVICE = "/gpu:0" if torch.cuda.is_available() else "/cpu"
# Use another GPU core for SDXL if possible
SD_DEVICE = "cpu"
if int(torch.cuda.device_count()) == 1:
    SD_DEVICE = "cuda:0"
else:
    SD_DEVICE = "cuda:1"

from diffusers import DiffusionPipeline

# @note These defaults can be changed based on the user's preferences
GENRE_COLOUR_MAPPING = {
    'future house' : ["blue", "red"],
    'bass house' : ["black", "purple"],
    'progressive house' : ["orange", "yellow"],
    'melodic house' : ["green", "blue"]
}

GENRE_WORD_MAPPING = {
    'future house' : ["unveils", "surprises"],
    'bass house' : ["ascends", "explodes"],
    'progressive house' : ["balances", "hypnotizes"],
    'melodic house' : ["blends", "stuns"]
}


# Load the Stabke Duffusion XL pipeline
sdxl_pipeline = DiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float16, use_safetensors=True, variant="fp16")
sdxl_pipeline.to(SD_DEVICE)

# Load the VILA pipeline
vila_model = hub.load('https://tfhub.dev/google/vila/image/1')
vila_predict_fn = vila_model.signatures['serving_default']

# Load the Magenta pipeline
magenta_model = hub.load('https://tfhub.dev/google/magenta/arbitrary-image-stylization-v1-256/2')

# Load the surprise estimation pipeline
class CreativeNet(nn.Module):
    def __init__(self, train_baseline_classifier = False, num_output_classes = 2, dropout_rate = 0.20):
        super().__init__()
        
        # Set instance variables
        self.train_baseline_classifier = train_baseline_classifier
        self.num_outuput_classes = num_output_classes
        self.dropout_rate = dropout_rate
        
        # Set the current device for tensor calculations
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        
        # Baseline: MobileNet V3 small
        self.baseline = models.mobilenet_v3_small(weights = models.MobileNet_V3_Small_Weights.IMAGENET1K_V1)
        
        # Freeze the parameters of the base model (including but not limited to the last layers)
        for param in self.baseline.parameters():
            param.requires_grad = False
        
        if self.train_baseline_classifier:
            for param in self.baseline.classifier.parameters():
                param.requires_grad = True
                
        # Fully-connected block
        self.fc1 = nn.Linear(1000, 128)
        self.dropout1 = nn.Dropout(self.dropout_rate)
        self.fc2 = nn.Linear(128, 32)
        self.dropout2 = nn.Dropout(self.dropout_rate)
        self.fc3 = nn.Linear(32, self.num_outuput_classes)
        
    def forward(self, x):
        # Baseline
        x = x.to(self.device)
        x = self.baseline(x)
        
        # FC Block
        x = F.leaky_relu(self.fc1(x))
        x = self.dropout1(x)
        x = F.leaky_relu(self.fc2(x))
        x = self.dropout2(x)
        x = F.leaky_relu(self.fc3(x))
        x = torch.sigmoid(x)
        return x
    
surprise_model_args = {
        "train_baseline_classifier" : False, 
        "num_output_classes" : 2,
        "dropout_rate" : 0.35
    }
surprise_model = CreativeNet(**surprise_model_args).to(TORCH_DEVICE)

"""
    @method generate_and_save_image
        Generate and save an image for a text prompt 
    @param prompt: Textual prompt containg the specifics of the image to be generated
    @param output_file_path: Path to the output file where the generated image is to be stored
    @param num_inference_steps: Number of diffusion steps SDXL will take to generate the image (@note The higher this number, the longer the pipeline will take while maintaining higher-quality outputs) (default: 50)
"""
def generate_and_save_image(prompt:str, output_file_path:str, num_inference_steps:int=50) -> Optional[str]:
    try:
        global sdxl_pipeline
        # Generate image
        images = sdxl_pipeline(prompt=prompt, num_inference_steps=num_inference_steps)
        img = images[0][0]
        # Save image
        img.save(output_file_path)
        return output_file_path
    except Exception as e:
        print(f"Error while generating image: {e}")
        return None
    
"""
    @method generate_and_save_image_stream
        Generate and save genre-driven candidate album covers
        @note To change the genre-mapping configs, rewrite GENRE_COLOR_MAPPING and/or GENRE_WORD_MAPPING before calling this function.
    @param genre_name: Name of the genre, as it appears in the keys of GENRE_COLOR_MAPPING and GENRE_WORD_MAPPING
    @param topic: Topic of the music piece
    @param output_dir: Path to the parent directory where the contiguous melspectrogram images are to be stored
    @param num_inference_steps: Number of diffusion steps SDXL will take to generate the image (@note The higher this number, the longer the pipeline will take while maintaining higher-quality outputs) (default: 50)
"""
def generate_and_save_image_stream(genre_name:str, topic:str, output_dir:str, num_inference_steps:int=50) -> Optional[List[str]]:
    try:
        global GENRE_COLOUR_MAPPING, GENRE_WORD_MAPPING
        # Keep running count of the current time index and number of images generated
        num_images_generated = 0
        saved_output_file_names = []

        # Generate and save an image using the GridSearch heuristic
        for colour in GENRE_COLOUR_MAPPING[genre_name]:
            for word in GENRE_WORD_MAPPING[genre_name]:
                output_file = f"{output_dir}/sdxl{num_images_generated}.png"
                # Construct the prompt
                prompt = f"{colour} colored album cover for music about {topic} that {word}"
                output_file = generate_and_save_image(prompt=prompt,output_file_path=output_file,num_inference_steps=num_inference_steps)
                if output_file is not None:
                    saved_output_file_names.append(output_file)
                    num_images_generated += 1
        
        return saved_output_file_names
    except Exception as e:
        print(f"Error while generating and saving image stream: {e}")
        return None

"""
    @method get_vila_score
        Score an image for its aesthetic qualities using the VILA model
    @param input_image_path: Path to the input image
"""
def get_vila_score(input_image_path:str) -> Optional[float]:
    try:
        global TF_DEVICE   , vila_predict_fn, vila_model 
        # Load image
        img = Image.open(input_image_path)
        # Convert image to Bytes array @ref https://stackoverflow.com/a/33117447
        img_byte_arr = BytesIO()
        img.save(img_byte_arr, format=img.format)
        img_byte_arr = img_byte_arr.getvalue()
        # Get predictions
        with tf.device(TF_DEVICE):
            prediction = vila_predict_fn(tf.constant(img_byte_arr))
            return float(prediction['predictions'][0][0])
    except Exception as e:
        print(f"Error while calculating VILA score for {input_image_path}: {e}")
        return None

"""
    @method get_surprise_score
        Get the surprise coefficient from a MUMU-trained model
    @param input_image_path: Path to the input SDXL image
    @param model_path: Path to the `.pt` file containing the surprise estimation model
    @note The model will be loaded with the `model.load_state_dict(torch.load(PATH))` moniker.
"""
def get_surprise_score(input_image_path:str, model_path:str) -> Optional[str]:
    try:
        global TORCH_DEVICE, surprise_model   

        # Transform the input image into a torch tensor @ref: https://www.projectpro.io/recipes/convert-image-tensor-pytorch
        transform = transforms.Compose([
                transforms.Resize((256, 256)),
                transforms.ToTensor(),
        ])

        img = Image.open(input_image_path).convert("RGB")
        transformed_img = transform(img=img)
        x = torch.Tensor(transformed_img)
        x = x.to(TORCH_DEVICE)

        # Load model
        surprise_model.load_state_dict(torch.load(model_path))
        surprise_model.to(TORCH_DEVICE)
        surprise_model.eval()

        # Compute outputs
        with torch.no_grad():
            outputs = surprise_model(x.unsqueeze(0))
            y = torch.softmax(outputs, dim = 1).detach().cpu()
            selected_score = float(y[0][1].item()) # Order of scores: ai, human
        return selected_score
    except Exception as e:
        print(f"Error while classifying genre of {input_image_path}: {e.with_traceback()}")
        return None
    
"""
    @method neural_style_transfer_vanilla
        Perform fast neural style transfer using the Magenta model
        @note ref: https://www.kaggle.com/models/google/arbitrary-image-stylization-v1/frameworks/tensorFlow1/variations/256/versions/2?tfhub-redirect=true
    @param content_image_path: Path to the input image that serves as the content image
    @param style_image_path: Path to the input image that serves as the style image
    @param output_file_path: Path to which the style transfer output is to be stored 
"""
def neural_style_transfer_vanilla(content_image_path:str, style_image_path:str, output_file_path:str) -> Optional[str]:
    # Nested function to load a PIL image as a TF tensor
    def load_img(path_to_img):
        max_dim = 512
        img = tf.io.read_file(path_to_img)
        img = tf.image.decode_image(img, channels=3)
        img = tf.image.convert_image_dtype(img, tf.float32)

        shape = tf.cast(tf.shape(img)[:-1], tf.float32)
        long_dim = max(shape)
        scale = max_dim / long_dim

        new_shape = tf.cast(shape * scale, tf.int32)

        img = tf.image.resize(img, new_shape)
        img = img[tf.newaxis, :]
        return img
    
    # Nested function to convert TF tensor back to PIL image
    def tensor_to_image(tensor):
        tensor = tensor*255
        tensor = np.array(tensor, dtype=np.uint8)
        if np.ndim(tensor)>3:
            assert tensor.shape[0] == 1
            tensor = tensor[0]
        return Image.fromarray(tensor)

    try:
        global magenta_model
        content_image_tensor = load_img(content_image_path)
        style_image_tensor = load_img(style_image_path)
        stylised_image_tensor = magenta_model(tf.constant(content_image_tensor), tf.constant(style_image_tensor))[0]
        stylised_image_pil = tensor_to_image(stylised_image_tensor)
        stylised_image_pil.save(output_file_path)
        return output_file_path
    except Exception as e:
        print(f"Error while performing vanilla neural style transfer: {e}")
        return None
    
"""
    @method neural_style_transfer_vanilla_stream
        Perform OvA (one-vs-all) Neural style transfer with a single content/style image and a directory of style/content images
    @param single_image_path: Path to the single input image that serves as the content image
    @param stream_image_dir: Path to the directory containing the input images that serves as the content/style images
    @param ova_mode: Mode of neural style transfer. This parameter takes two values: "style" for single style image and a stream of content images; and "content" for a single content image and a stream of style images
    @param output_dir: Path to the parent directory where the contiguous melspectrogram images are to be stored
"""
def neural_style_transfer_vanilla_stream(single_image_path:str, stream_image_dir:str, ova_mode:str, output_dir:str) -> Optional[List[str]]:
    try:
        # Maintain a list of stream image filenames and count of generated images
        stream_image_filenames = glob(f"{stream_image_dir}/*")
        assert len(stream_image_filenames) > 0

        # Keep running count of the current time index and number of images generated
        num_images_generated = 0
        saved_output_file_names = []

        for stream_image in tqdm(stream_image_filenames):
            output_file = f"{output_dir}/nst_{num_images_generated}.png"
            # Check OvA mode
            if ova_mode == "style":
                output_file = neural_style_transfer_vanilla(content_image_path=stream_image,style_image_path=single_image_path,output_file_path=output_file)
                if output_file is not None:
                    saved_output_file_names.append(output_file)
                    num_images_generated += 1

            if ova_mode == "content":
                output_file = neural_style_transfer_vanilla(content_image_path=single_image_path,style_image_path=stream_image,output_file_path=output_file)
                if output_file is not None:
                    saved_output_file_names.append(output_file)
                    num_images_generated += 1

        return saved_output_file_names

    except Exception as e:
        print(f"Error while generating and saving neural style transfer image stream: {e}")
        return None

In [None]:
"""
    Collection of helper functions pertaining to the spectrogram domain of spectrogrand
"""
from typing import Optional, List
import librosa
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
from torchvision import transforms
torch.random.manual_seed(42)
DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"

# from audio_helpers import load_wav_file, load_wav_chunk

IDX_TO_LABEL_MAPPING = {0:'future house', 1:'bass house', 2:'progressive house', 3:'melodic house'}

"""
    @method generate_and_save_melspectrogram
        Generate and save a melspectrogram for audio data 
    @param input_data: Numpy array containing the audio to generate a spectrogram for
    @param input_sampling_rate: Sampling rate for the input audio
    @param output_file_path: Path to the output file where the generated melspectrogram is to be stored
    @param n_mels: Number of buckets used in the melspectrogram computation (default:128)
    @param hop_length: Hop length used in the melspectrogram computation (default: 512)
"""
def generate_and_save_melspectrogram(input_data:np.ndarray, input_sampling_rate:int, output_file_path:str, n_mels:int=128, hop_length:int=512) -> Optional[str]:
    try:
        # Generate melspectrogram
        melspectrum = librosa.feature.melspectrogram(
            y=input_data,
            sr=input_sampling_rate,
            hop_length= hop_length,
            window='hann',
            n_mels=n_mels
        )
        S_dB = librosa.power_to_db(melspectrum, ref=np.max)

        # Save image
        img = librosa.display.specshow(S_dB, sr=input_sampling_rate)
        plt.savefig(output_file_path, bbox_inches="tight",pad_inches=-0.1) # Removing whitespace ref: https://stackoverflow.com/questions/11837979/removing-white-space-around-a-saved-image 
        return output_file_path
    except Exception as e:
        print(f"Error while generating and saving melspectrogram: {e}")
        return None
    
"""
    @method generate_and_save_melspectrogram_stream
        Generate and save contiguous melspectrograms given a parent audio file
    @param input_file_path: Path to the input audio file
    @param output_dir: Path to the parent directory where the contiguous melspectrogram images are to be stored
    @param chunk_duration: Length of the contiguous chunks of the audio (default:0.1)
"""
def generate_and_save_melspectrogram_stream(input_file_path:str, output_dir:str, chunk_duration:float=0.1) -> Optional[List[str]]:
    try:
        # Load the audio file to get its duration
        parent_sr, parent_data = load_wav_file(input_file_path=input_file_path)
        parent_duration = librosa.get_duration(y=parent_data,sr=parent_sr)

        # Keep running count of the current time index and number of images generated
        num_images_generated = 0
        saved_output_file_names = []

        current_chunk_time_start = 0.0
        current_chunk_time_end = current_chunk_time_start + chunk_duration

        while float(current_chunk_time_end) <= float(parent_duration):
            output_file = f"{output_dir}/melspec_{num_images_generated}.png"
            # Load chunk data
            sr, y = load_wav_chunk(input_file_path=input_file_path,chunk_offset=current_chunk_time_start,chunk_duration=chunk_duration)
            # Construct melspectrogram
            output_file = generate_and_save_melspectrogram(input_data=y,input_sampling_rate=sr,output_file_path=output_file)
            if output_file is not None:
                saved_output_file_names.append(output_file)
                num_images_generated += 1
                
            current_chunk_time_start += chunk_duration
            current_chunk_time_end += chunk_duration

        return saved_output_file_names

    except Exception as e:
        print(f"Error while generating and saving melspectrogram stream for {input_file_path}: {e}")
        return None

"""
    @method get_classified_genre
        Get the classified genre from a HouseX-trained model
    @param input_spectrogram_path: Path to the input spectrogram image
    @param model_path: Path to the `.pth` file containing the genre classification model
    @note The model will be loaded with the `torch.load(PATH)` moniker.
"""
def get_classified_genre(input_spectrogram_path:str, model_path:str) -> Optional[str]:
    try:
        global DEVICE, IDX_TO_LABEL_MAPPING

        # Transform the input image into a torch tensor @ref: https://www.projectpro.io/recipes/convert-image-tensor-pytorch
        transform = transforms.Compose([
                transforms.Resize((96, 96)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        img = Image.open(input_spectrogram_path).convert('RGB')
        transformed_img = transform(img=img)
        x = torch.Tensor(transformed_img)
        x = x.to(DEVICE)

        # Load model
        model = torch.load(model_path)
        model.to(DEVICE)
        model.eval()

        # Compute outputs
        with torch.no_grad():
            outputs = model(x.unsqueeze(0))
            y = torch.softmax(outputs, dim = 1).detach().cpu()
            selected_index = int(torch.argmax(y).item())

        selected_genre = IDX_TO_LABEL_MAPPING[selected_index]
        return selected_genre
    except Exception as e:
        print(f"Error while classifying genre of {input_spectrogram_path}: {e}")
        return None

## Specify the topic of your creation here

Example: `a futuristic spaceship`

In [None]:
PARENT_TOPIC = "a futuristic spaceship"

### Generate candidate audio samples and choose the most novel one

In [None]:
num_inference_steps_custom = [600, 750]
saved_audio_files = create_and_save_audio_file_stream(topic = PARENT_TOPIC, output_dir = AUDIO_STREAM_DIR, num_inference_steps_list = num_inference_steps_custom, audio_length=10.0, negative_prompt = "low quality, monotonous, boring")
print(f"Generated {len(saved_audio_files)} music samlples into {AUDIO_STREAM_DIR}.")

In [None]:
GT_EMBEDDINGS_PATH = "/kaggle/input/spectrogrand-public-release/kaggle-public-release/housex_ground_truth_embeddings.pkl"
def calculate_novelty_score(input_file_path:str) -> float:
    try:
        sim_score = compute_clap_similarity(input_file_path=input_file_path, ground_truth_dict_path=GT_EMBEDDINGS_PATH, filter_genre=None)
        novelty_score = 1.0 - sim_score
        return novelty_score
    except Exception as e:
        print(f"Error while computing novelty score: {e}")
        return -1.0

In [None]:
max_novelty_score = -1.0
selected_audio_file = None

for _f in saved_audio_files:
    novelty_score = calculate_novelty_score(input_file_path=_f)
    if novelty_score > max_novelty_score:
        max_novelty_score = novelty_score
        selected_audio_file = _f 
    
print(f"Selecting {selected_audio_file} with the novelty score of {max_novelty_score}")

assert selected_audio_file is not None
assert max_novelty_score != -1.0

## Generate parent and stream spectrograms for the selected audio file and detect genre from the parent spectrogram

In [None]:
sr, y = load_wav_file(input_file_path=selected_audio_file)
parent_spec_path = generate_and_save_melspectrogram(input_data=y, input_sampling_rate=sr, output_file_path=f"{IMAGE_PARENT_DIR_SPEC}/parent.png")
print(f"Saved parent image spectrogram to {parent_spec_path}")

In [None]:
saved_chunks = generate_and_save_melspectrogram_stream(input_file_path=selected_audio_file, output_dir=IMAGE_STREAM_DIR_SPEC)
print(f"Generated {len(saved_chunks)} music samples into {IMAGE_STREAM_DIR_SPEC}.")

In [None]:
GENRE_CLF_MODEL_PATH = "/kaggle/input/spectrogrand-public-release/kaggle-public-release/resnet_101_genre_classifier.pth"
selected_genre = get_classified_genre(input_spectrogram_path=parent_spec_path,model_path=GENRE_CLF_MODEL_PATH)
print(f"The selected audio is closest to the genre of {selected_genre}")

## Generate candidate album covers and choose the one with the best equi-weighted score of value and surprisingness

In [None]:
sd_num_inference_steps = 200
saved_sd_images = generate_and_save_image_stream(genre_name=selected_genre,output_dir=IMAGE_STREAM_DIR_SD, num_inference_steps=sd_num_inference_steps, topic=PARENT_TOPIC)
print(f"Generated {len(saved_sd_images)} image samples into {IMAGE_STREAM_DIR_SD}.")

In [None]:
SURPRISE_EST_MODEL_PATH = "/kaggle/input/spectrogrand-public-release/kaggle-public-release/mobilenet_surprise_estimation.pt"

def calculate_value_score(input_file_path:str) -> float:
    try:
        vila_score = get_vila_score(input_image_path=input_file_path)
        return vila_score
    except Exception as e:
        print(f"Error while computing value score: {e}")
        return -1.0
    
def calculate_surprise_score(input_file_path:str) -> float:
    try:
        surprise_score = get_surprise_score(input_image_path=input_file_path, model_path=SURPRISE_EST_MODEL_PATH)
        return surprise_score
    except Exception as e:
        print(f"Error while computing surprise score: {e}")
        return -1.0
    
def get_album_cover_score(input_file_path:str) -> float:
    try:
        value_score = calculate_value_score(input_file_path=input_file_path)
        surprise_score = calculate_surprise_score(input_file_path=input_file_path)
        if value_score == -1.0 or surprise_score == -1.0:
            return -1.0
        # Update the weights if your use case values value more than surprise or vice-versa
        return 0.5*value_score + 0.5*surprise_score
    except Exception as e:
        print(f"Error while computing image score: {e}")
        return -1.0

In [None]:
max_image_score = -1.0
selected_image_file = None

for _f in saved_sd_images:
    image_score = get_album_cover_score(input_file_path=_f)
    if image_score > max_image_score:
        max_image_score = image_score
        selected_image_file = _f 
    
print(f"Selecting {selected_image_file} with the equiweighted score of {max_image_score}")

assert selected_image_file is not None
assert max_image_score != -1.0

## Generate style transfer images with the spectrogram streams and chosen album cover image

In [None]:
ova_mode = "content"
style_transfer_outputs_single_content_image = neural_style_transfer_vanilla_stream(single_image_path=selected_image_file, output_dir=IMAGE_STREAM_DIR_NST_1, ova_mode=ova_mode, stream_image_dir=IMAGE_STREAM_DIR_SPEC)
print(f"Generated {len(style_transfer_outputs_single_content_image)} style transferred images keeping {ova_mode} image constant")

In [None]:
ova_mode = "style"
style_transfer_outputs_single_style_image = neural_style_transfer_vanilla_stream(single_image_path=selected_image_file, output_dir=IMAGE_STREAM_DIR_NST_2, ova_mode=ova_mode, stream_image_dir=IMAGE_STREAM_DIR_SPEC)
print(f"Generated {len(style_transfer_outputs_single_content_image)} style transferred images keeping {ova_mode} image constant")

In [None]:
print(f"selected_image_file: {selected_image_file}")
print(f"BASE_DIR: {BASE_DIR}")
print(f"selected_audio_file: {selected_audio_file}")

## Create Videos using MoviePy

⚠️ Note: For dependency reasons, you must restart the kernel and follow the steps listed below to use MoviePy. Please ensure all files are stored to `/kaggle/output`  
⚠️ Before you restart the kernel, make sure you copy and save the variables `{selected_image_file}`, `{BASE_DIR}`, and `{selected_audio_file}`

For your convenience, you can paste them into this markdown cell:
```py
selected_image_file = "/kaggle/working/logs_1709970288/saved_image_stream_sd/sdxl3.png"
BASE_DIR = "/kaggle/working/logs_1709970288"
selected_audio_file = "/kaggle/working/logs_1709970288/saved_audio_stream/audio1.wav"
```

In [None]:
!pip install --upgrade ffmpeg moviepy
!pip install --upgrade decorator==4.0.2
!pip install moviepy --upgrade --force-reinstall

## Reload the variables saved from earlier

In [None]:
selected_image_file = "/kaggle/working/logs_1709970288/saved_image_stream_sd/sdxl3.png"
BASE_DIR = "/kaggle/working/logs_1709970288"
selected_audio_file = "/kaggle/working/logs_1709970288/saved_audio_stream/audio1.wav"

## Load helper functions

In [None]:
"""
    Collection of helper functions pertaining to the video domain of spectrogrand
"""
from moviepy.editor import AudioFileClip, ImageClip, ImageSequenceClip
from typing import Optional

"""
    @method generate_and_save_video_dynamic
        Generate and save a dynamic video for a given audio file and image folder 
    @param audio_file_path: Path to the background audio file
    @param image_dir_path: Path to the directory containing the list of images to be included in the video (@note Expected file format: *_{idx}.* - the files will be sorted based on `idx`.)
    @param output_video_path: Path to the file where the video is to be stored
"""
def generate_and_save_video_dynamic(audio_file_path:str, image_dir_path:str, output_video_path:str) -> Optional[str]:
    try:
        # Create audio clip
        with AudioFileClip(filename=audio_file_path) as audio_clip:
            audio_duration = audio_clip.duration

            # Create image sequence clip
            with ImageSequenceClip(sequence=image_dir_path, fps = 10) as image_sequence_clip:
                image_sequence_clip = image_sequence_clip.set_duration(audio_duration)
                image_sequence_clip = image_sequence_clip.set_audio(audio_clip)
                image_sequence_clip = image_sequence_clip.set_fps(10)

                # Export the clip
                image_sequence_clip.write_videofile(output_video_path)
                return output_video_path
    except Exception as e:
        print(f"Error while generating dynamic video clip: {e}")
        return None

"""
    @method generate_and_save_video_static
        Generate and save a static video for a given audio file and image file 
    @param audio_file_path: Path to the background audio file
    @param image_file_path: Path to the image file
    @param output_video_path: Path to the file where the video is to be stored
"""
def generate_and_save_video_static(audio_file_path:str, image_file_path:str, output_video_path:str) -> Optional[str]:
    try:
        # Create audio clip
        with AudioFileClip(filename=audio_file_path) as audio_clip:
            audio_duration = audio_clip.duration

            # Create image clip
            with ImageClip(img=image_file_path) as image_clip:
                image_clip = image_clip.set_duration(audio_duration) # @note ref: https://stackoverflow.com/questions/75414756/combine-image-and-audio-together-using-moviepy-in-python
                image_clip = image_clip.set_audio(audio_clip)
                image_clip = image_clip.set_fps(10)

                # Export the clip
                image_clip.write_videofile(output_video_path)
                return output_video_path
    except Exception as e:
        print(f"Error while generating static video clip: {e}")
        return None

In [None]:
VIDEOS_OUTUPUT_DIR = f"{BASE_DIR}/saved_videos"
IMAGE_STREAM_DIR_NST_1 = f"{BASE_DIR}/saved_image_stream_nst_1"
IMAGE_STREAM_DIR_NST_2 = f"{BASE_DIR}/saved_image_stream_nst_2"

!mkdir -p {VIDEOS_OUTUPUT_DIR}

## Generate static video

In [None]:
static_video_path = generate_and_save_video_static(audio_file_path=selected_audio_file, image_file_path=selected_image_file, output_video_path=f"{VIDEOS_OUTUPUT_DIR}/static.mp4")
print(f"Generated video at {static_video_path}")

## Generating dynamic videos

In [None]:
dynamic_video_path_constant_content_image = generate_and_save_video_dynamic(audio_file_path=selected_audio_file, image_dir_path=IMAGE_STREAM_DIR_NST_1,output_video_path=f"{VIDEOS_OUTUPUT_DIR}/dynamic1.mp4")
print(f"Generated video at {dynamic_video_path_constant_content_image}")

In [None]:
dynamic_video_path_constant_style_image = generate_and_save_video_dynamic(audio_file_path=selected_audio_file, image_dir_path=IMAGE_STREAM_DIR_NST_2,output_video_path=f"{VIDEOS_OUTUPUT_DIR}/dynamic2.mp4")
print(f"Generated video at {dynamic_video_path_constant_style_image}")