In [None]:
!pip install tensorflow_addons

Collecting tensorflow_addons
  Downloading tensorflow_addons-0.23.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (611 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/611.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/611.8 kB[0m [31m2.8 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m604.2/611.8 kB[0m [31m8.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m611.8/611.8 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
Collecting typeguard<3.0.0,>=2.7 (from tensorflow_addons)
  Downloading typeguard-2.13.3-py3-none-any.whl (17 kB)
Installing collected packages: typeguard, tensorflow_addons
Successfully installed tensorflow_addons-0.23.0 typeguard-2.13.3


In [None]:
import librosa

import numpy as np
import pandas as pd
import os

import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras import layers

import matplotlib.pyplot as plt
from IPython import display
from IPython.display import clear_output

import glob
import imageio
import time
import IPython.display as ipd

AUTOTUNE = tf.data.experimental.AUTOTUNE

from keras.saving import register_keras_serializable
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split

@register_keras_serializable()
class Resnet1DBlock(tf.keras.Model):
    def __init__(self, kernel_size, filters, type='encode'):
        super(Resnet1DBlock, self).__init__()

        if type == 'encode':
            self.conv1a = layers.Conv1D(filters, kernel_size, 2, padding="same")
            self.conv1b = layers.Conv1D(filters, kernel_size, 1, padding="same")
            self.norm1a = tfa.layers.InstanceNormalization()
            self.norm1b = tfa.layers.InstanceNormalization()
        elif type == 'decode':
            self.conv1a = layers.Conv1DTranspose(filters, kernel_size, 1, padding="same")
            self.conv1b = layers.Conv1DTranspose(filters, kernel_size, 1, padding="same")
            self.norm1a = tf.keras.layers.BatchNormalization()
            self.norm1b = tf.keras.layers.BatchNormalization()
        else:
            return None

    def call(self, input_tensor):
        x = tf.nn.relu(input_tensor)

        x = self.conv1a(x)
        x = self.norm1a(x)
        x = layers.LeakyReLU(0.4)(layers.Dropout(0.3)(x))
        #x = layers.LeakyReLU(0.4)(x)
        x = self.conv1b(x)
        x = self.norm1b(x)
        x = layers.LeakyReLU(0.4)(layers.Dropout(0.3)(x))
        #x = layers.LeakyReLU(0.4)(x)

        x += input_tensor
        return tf.nn.relu(layers.Dropout(0.3)(x))
        #return tf.nn.relu(x)

from tensorflow.keras import layers, Model
class CVAE(tf.keras.Model):
    """Convolutional variational autoencoder."""

    def __init__(self, latent_dim, input_size):
        super(CVAE, self).__init__()
        self.latent_dim = latent_dim
        self.input_size = input_size
        self.encoder = self.build_encoder(self.latent_dim,self.input_size)
        self.decoder = self.build_decoder(self.latent_dim,self.input_size)

    def build_encoder(self,latent_dim, input_size):
        inputs = layers.Input(shape=(1, input_size))
        x = layers.Conv1D(64, 1, strides=2)(inputs)
        x = Resnet1DBlock(64, 1)(x)
        x = layers.Conv1D(128, 1, strides=2)(x)
        x = Resnet1DBlock(128, 1)(x)
        x = layers.Conv1D(128, 1, strides=2)(x)
        x = Resnet1DBlock(128, 1)(x)
        x = layers.Conv1D(256, 1, strides=2)(x)
        x = Resnet1DBlock(256, 1)(x)
        """
        #add more
        x = layers.Conv1D(256, 1, strides=2)(x)
        x = Resnet1DBlock(256, 1)(x)
        x = layers.Conv1D(512, 1, strides=2)(x)
        x = Resnet1DBlock(512, 1)(x)
        x = layers.Conv1D(512, 1, strides=2)(x)
        x = Resnet1DBlock(512, 1)(x)
        x = layers.Conv1D(1024, 1, strides=2)(x)
        x = Resnet1DBlock(1024, 1)(x)

        ##END
        """


        # No activation
        x = layers.Flatten()(x)
        outputs= layers.Dense(latent_dim+latent_dim)(x)
        """
        mean = layers.Dense(latent_dim)(x)
        log_var = layers.Dense(latent_dim)(x)

        """


        #return Model(inputs,[mean, log_var])
        return Model(inputs, outputs)

    def build_decoder(self,latent_dim, input_size):
        inputs = layers.Input(shape=(latent_dim,))
        x = layers.Reshape(target_shape=(1, latent_dim))(inputs)
        """
        #add more  1024
        x = Resnet1DBlock(2048, 1, 'decode')(x)
        x = layers.Conv1DTranspose(1024, 1, 1)(x)
        x = Resnet1DBlock(2048, 1, 'decode')(x)
        x = layers.Conv1DTranspose(1024, 1, 1)(x)

        ##END
        """
        x = Resnet1DBlock(512, 1, 'decode')(x)
        x = layers.Conv1DTranspose(512, 1, 1)(x)
        x = Resnet1DBlock(256, 1, 'decode')(x)
        x = layers.Conv1DTranspose(256, 1, 1)(x)
        x = Resnet1DBlock(128, 1, 'decode')(x)
        x = layers.Conv1DTranspose(128, 1, 1)(x)
        x = Resnet1DBlock(64, 1, 'decode')(x)
        x = layers.Conv1DTranspose(64, 1, 1)(x)
        outputs = layers.Conv1DTranspose(input_size, 1, 1)(x)
        return Model(inputs, outputs)
    def get_metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]
    def call(self, inputs):
        """Call the model on a particular input."""
        z_mean, z_log_var = self.encode(inputs)
        z = self.reparameterize(z_mean, z_log_var)
        reconstruction= self.decode(z)
        return z_mean, z_log_var, z, reconstruction
    @tf.function
    def update(self,encoder,decoder):
        self.encoder = encoder
        self.decoder = decoder
    @tf.function
    def sample(self, eps=None):
        if eps is None:
            eps = tf.random.normal(shape=(200, self.latent_dim))
        return self.decode(eps, apply_sigmoid=True)
    @tf.function
    def encode(self, x,training= False ):
        #mean, logvar = self.encoder(x)
        #mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        mean, logvar = tf.split(self.encoder(x,training=training), num_or_size_splits=2, axis=1)
        return mean, logvar
    @tf.function
    def reparameterize(self, mean, logvar):
        eps = tf.random.normal(shape=tf.shape(mean))#eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar * .5) + mean
    @tf.function
    def decode(self, z, apply_sigmoid=False ,training= False ):
        #logits = self.decoder(z)
        logits = self.decoder(z,  training=training)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        return logits
    """
    def train_step(self, data):

       #Step run during training
        x= data
        with tf.GradientTape() as tape:
            # FOWARD
            #mean, logvar, x_logit ,z = self(x, training=True)
            mean, logvar = self.encode(x)
            z = self.reparameterize(mean, logvar)
            x_logit = self.decode(z)

            cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
            #KL_loss COMPUTE
            logpx_z = -tf.reduce_sum(cross_ent, axis=[1,2])
            logpz = log_normal_pdf(z, 0., 0.)
            logqz_x = log_normal_pdf(z, mean, logvar)
            loss_KL = -tf.reduce_mean(logpx_z + logpz - logqz_x)
            #reconstruction_loss COMPUTE
            reconstruction_loss = tf.reduce_mean(
                     tf.keras.losses.binary_crossentropy(x, x_logit)
                 )
            total_loss = reconstruction_loss+ loss_KL * beta

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        #gradients = tape.gradient(total_loss, model.trainable_variables)
        #optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
    def test_step(self, data):
        #Step run during validation.
        if isinstance(data, tuple):
            data = data[0]

        x= data

        mean, logvar = self.encode(x)
        z = self.reparameterize(mean, logvar)
        x_logit = self.decode(z)

       # mean, logvar, x_logit,z = self(x)

        cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
        # KL
        logpx_z = -tf.reduce_sum(cross_ent, axis=[1,2])
        logpz = log_normal_pdf(z, 0., 0.)
        logqz_x = log_normal_pdf(z, mean, logvar)
        loss_KL = -tf.reduce_mean(logpx_z + logpz - logqz_x)
        # Reconstruction
        reconstruction_loss = tf.reduce_mean(
                             tf.keras.losses.binary_crossentropy(x, x_logit)
                         )
        total_loss = reconstruction_loss+ loss_KL
        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss":loss_KL,
        }

        """


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



In [None]:
### load the inception MODEL
import time
import numpy as np
from keras.models import load_model, save_model

model_path = '/Users/fourold/Desktop/CS782/Assignment1/src'
#save_model(model.encoder, model_path + f'/inception_model_(L20_I{input_size}).keras')
inception =load_model('/content/inception_model_(L20_I90000).keras')

In [None]:
# you can use S4 by  S4(d_model, l_max=L, bidirectional=True)
# d_model is the input channel = 1 , L is audio size = 900000
"""Minimal version of S4D with extra options and features stripped out, for pedagogical purposes."""

import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange, repeat
class Conv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, dilation=1):
        super(Conv, self).__init__()
        self.padding = dilation * (kernel_size - 1) // 2
        self.conv = nn.Conv1d(in_channels, out_channels, kernel_size, dilation=dilation, padding=self.padding)
        self.conv = nn.utils.weight_norm(self.conv)
        nn.init.kaiming_normal_(self.conv.weight)

    def forward(self, x):
        out = self.conv(x)
        return out
#from src.models.nn import DropoutNd
class TransposedLN(nn.Module):
    def __init__(self, d):
        super().__init__()
        self.m = nn.Parameter(torch.zeros(1))
        self.s = nn.Parameter(torch.ones(1))

    def forward(self, x):
        s, m = torch.std_mean(x, dim=-2, unbiased=False, keepdim=True)
        y = (self.s/s) * (x-m+self.m)
        return y

class FF(nn.Module):
    def __init__(self, d_model, expand=2):
        super().__init__()
        d_inner = expand * d_model

        linear1 = Conv(d_model, d_inner, 1)
        linear2 = Conv(d_inner, d_model, 1)

        self.ff = nn.Sequential(
            linear1,
            nn.GELU(),
            linear2,
        )

    def forward(self, x, *args, **kwargs):
        return self.ff(x)


class S4DBlock(nn.Module):
    def __init__(self, d_model, expand=2):
        super(S4DBlock, self).__init__()
        d_inner = expand * d_model

        self.norm1 = TransposedLN(d_model)  # LayerNorm(y)
        self.norm2 = TransposedLN(d_model)  # LayerNorm(y)
        self.s4 = S4D(d_model, d_state=20, dropout = 0.0) # S4(y)
        #self.linear1 =  Conv(d_model, d_inner, 1)  # W y + b.  ( d - > n)
        self.linear2 =  Conv(d_model, d_inner, 1)  # W1 y + b1 ( n-> d? )
        self.linear3 =  Conv(d_inner, d_model, 1)  # W2 y + b2
        self.gelu = nn.GELU()  # φ(y)

    def forward(self, x):
        # Block 1
        y = x
        #y = self.norm1(y)
        #y = self.s4(y)#y, _ = self.s4(y)
        #y = self.gelu(y)
        #y = self.linear1(y) #alread in the S4 ?
        #y = x + y

        # Block 2
        #x = y
        y = self.norm2(y)
        y = self.linear2(y)  # Use linear2 here
        y = self.gelu(y)
        y = self.linear3(y)
        y = x + y

        return y
class S4DKernel(nn.Module):
    """Generate convolution kernel from diagonal SSM parameters."""

    def __init__(self, d_model, N=64, dt_min=0.001, dt_max=0.1, lr=None):
        super().__init__()
        # Generate dt
        H = d_model
        log_dt = torch.rand(H) * (
            math.log(dt_max) - math.log(dt_min)
        ) + math.log(dt_min)

        C = torch.randn(H, N // 2, dtype=torch.cfloat)
        self.C = nn.Parameter(torch.view_as_real(C))
        self.register("log_dt", log_dt, lr)

        log_A_real = torch.log(0.5 * torch.ones(H, N//2))
        A_imag = math.pi * repeat(torch.arange(N//2), 'n -> h n', h=H)
        self.register("log_A_real", log_A_real, lr)
        self.register("A_imag", A_imag, lr)

    def forward(self, L):
        """
        returns: (..., c, L) where c is number of channels (default 1)
        """

        # Materialize parameters
        dt = torch.exp(self.log_dt) # (H)
        C = torch.view_as_complex(self.C) # (H N)
        A = -torch.exp(self.log_A_real) + 1j * self.A_imag # (H N)

        # Vandermonde multiplication
        dtA = A * dt.unsqueeze(-1)  # (H N)
        K = dtA.unsqueeze(-1) * torch.arange(L, device=A.device) # (H N L)
        C = C * (torch.exp(dtA)-1.) / A
        K = 2 * torch.einsum('hn, hnl -> hl', C, torch.exp(K)).real

        return K

    def register(self, name, tensor, lr=None):
        """Register a tensor with a configurable learning rate and 0 weight decay"""

        if lr == 0.0:
            self.register_buffer(name, tensor)
        else:
            self.register_parameter(name, nn.Parameter(tensor))

            optim = {"weight_decay": 0.0}
            if lr is not None: optim["lr"] = lr
            setattr(getattr(self, name), "_optim", optim)


class S4D(nn.Module):
    def __init__(self, d_model, d_state=64, dropout=0.0, transposed=True, **kernel_args):
        super().__init__()

        self.h = d_model
        self.n = d_state
        self.d_output = self.h
        self.transposed = transposed

        self.D = nn.Parameter(torch.randn(self.h))

        # SSM Kernel
        self.kernel = S4DKernel(self.h, N=self.n, **kernel_args)

        # Pointwise
        self.activation = nn.GELU()
        dropout_fn = nn.Dropout#2d # NOTE: bugged in PyTorch 1.11
        #dropout_fn = DropoutNd
        self.dropout = dropout_fn(dropout) if dropout > 0.0 else nn.Identity()

        # position-wise output transform to mix features
        self.output_linear = nn.Sequential(
            nn.Conv1d(self.h, 2*self.h, kernel_size=1),
            nn.GLU(dim=-2),
        )

    def forward(self, u, **kwargs): # absorbs return_output and transformer src mask
        """ Input and output shape (B, H, L) """
        if not self.transposed: u = u.transpose(-1, -2)
        L = u.size(-1)

        # Compute SSM Kernel
        k = self.kernel(L=L) # (H L)

        # Convolution
        k_f = torch.fft.rfft(k, n=2*L) # (H L)
        u_f = torch.fft.rfft(u, n=2*L) # (B H L)
        y = torch.fft.irfft(u_f*k_f, n=2*L)[..., :L] # (B H L)

        # Compute D term in state space equation - essentially a skip connection
        y = y + u * self.D.unsqueeze(-1)

        y = self.dropout(self.activation(y))
        y = self.output_linear(y)
        if not self.transposed: y = y.transpose(-1, -2)
        return y#, None # Return a dummy state to satisfy this repo's interface, but this can be modified
import torch
import torch.nn as nn

class S4_AE(nn.Module):
    def __init__(self, input_dim=None):
        super(S4_AE, self).__init__()
        layer = [2**i for i in range(4)] # [1,2,4,8]
        #H = [2**i for i in range(5)]
        #L = [900000//i for i in H]
        #H_  = H[::-1]
        #L_  = L[::-1]
        #totalH , totalL = H+H_[1:] , L+L_[1:]

        self.d_layer = nn.ModuleList()  # Define d_layer as a ModuleList
        for index, i in enumerate(layer):
            #if index == 0:
                #self.d_layer.append(S4D(1, d_state=64, dropout=0.2))
            if index == 1:
                self.d_layer.append(S4D(2, d_state=10, dropout=0.2))
                #self.d_layer.append(S4DBlock( 2))
            self.d_layer.append(nn.Conv1d(i, 2*i, 2, stride=2))

        self.u_layer = nn.ModuleList()  # Define u_layer as a ModuleList
        for index , i  in enumerate(layer[::-1]): # i is the layer , index ....
            self.u_layer.append(nn.ConvTranspose1d(2*i, (2*i)//2, 2, stride=2)) #S4D(d_model, d_state=64, dropout=0.0, transposed=True)
            if index == 2 :
                self.u_layer.append(S4D(2, d_state=10, dropout=0.2))
                #self.u_layer.append(S4DBlock(2))
            if index  == 3:

                self.u_layer.append(S4D(1, d_state=10, dropout=0.2))
                #self.u_layer.append(S4DBlock( 1))

    def forward(self, x):
        for layer in self.d_layer:
            x = layer(x)
        for layer in self.u_layer:
            x = layer(x)
        return x

class S4D_AE(nn.Module):
    def __init__(self, input_dim=None):
        super(S4D_AE, self).__init__()
        self.s4d_1 =S4D(1, d_state=10, dropout=0.0)
        self.s4d_2 =S4D(2, d_state=10, dropout=0.0)
        self.s4d_4 =S4D(4, d_state=64, dropout=0.2)
        self.norm = nn.LayerNorm(90000)
        self.d1 =nn.Conv1d(1, 2, 2, stride=2)
        self.norm_1 = nn.LayerNorm(45000)
        self.d2 = nn.Conv1d(2, 4, 2, stride=2)
        self.norm_2 = nn.LayerNorm(22500)
        self.u1 = nn.ConvTranspose1d(4, 2 , 2, stride=2)
        self.u2 = nn.ConvTranspose1d(2, 1 , 2, stride=2)
        self.gelu = nn.GELU()

    def forward(self, x): #(1,1,90000)
            x = self.norm(x)
            x = self.s4d_1(x) # (B,H,L) - > (B,H ,L)
            x = self.d1(x) # 1,1,90000 -> 1,2,45000
            x = self.norm_1(x)
            x = self.s4d_2(x) # (B,H,L) - > (B,H ,L)
            x = self.gelu(x)
            x = self.norm_1(x)
            """
            x = self.d2(x) #  1,2,45000 -> 1,4,22500 XXXXX
            x = self.norm_2(x)
            x = self.s4d_4(x) # (B,H,L) - > (B,H ,L) XXXXX
            x = self.u1(x) #  1,4,22500 -> 1,2,45000 XXXXX
            x = self.norm_1(x)
            x = self.s4d_2(x) # (B,H,L) - > (B,H ,L) XXXXX"""
            x = self.u2(x) #  1,2,45000 -> 1,1,90000
            x = self.norm(x)
            x = self.s4d_1(x) # (B,H,L) - > (B,H ,L)
            x = self.gelu(x)
            #x = self.norm(x)
            return x


In [None]:
#!pip install einops

Collecting einops
  Downloading einops-0.7.0-py3-none-any.whl (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.6/44.6 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.7.0


In [None]:
S4D_model = S4D_AE()

In [None]:
import torch

# Define the file path where you want to save the model
file_path = '/content/S4_model_classical.pth'
S4D_model.load_state_dict(torch.load(file_path))

<All keys matched successfully>

In [None]:
#Load the data

In [None]:
import json
from pathlib import Path

# Provide your Kaggle API credentials
kaggle_json = {
    "username": "vanfourold",
    "key": "5b95c8e0e1303081f57adc898b9ed23f"
}

# Write Kaggle API credentials to a file
kaggle_json_path = Path("kaggle.json")
with open(kaggle_json_path, "w") as f:
    json.dump(kaggle_json, f)

# Move kaggle.json to the correct location
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Install Kaggle library
!pip install -q kaggle

# Download dataset (replace 'dataset-name' with the desired dataset name from Kaggle)
!kaggle datasets download -d andradaolteanu/gtzan-dataset-music-genre-classification


# Unzip the dataset
!unzip gtzan-dataset-music-genre-classification.zip

Downloading gtzan-dataset-music-genre-classification.zip to /content
 99% 1.20G/1.21G [00:19<00:00, 94.2MB/s]
100% 1.21G/1.21G [00:19<00:00, 67.7MB/s]
Archive:  gtzan-dataset-music-genre-classification.zip
  inflating: Data/features_30_sec.csv  
  inflating: Data/features_3_sec.csv  
  inflating: Data/genres_original/blues/blues.00000.wav  
  inflating: Data/genres_original/blues/blues.00001.wav  
  inflating: Data/genres_original/blues/blues.00002.wav  
  inflating: Data/genres_original/blues/blues.00003.wav  
  inflating: Data/genres_original/blues/blues.00004.wav  
  inflating: Data/genres_original/blues/blues.00005.wav  
  inflating: Data/genres_original/blues/blues.00006.wav  
  inflating: Data/genres_original/blues/blues.00007.wav  
  inflating: Data/genres_original/blues/blues.00008.wav  
  inflating: Data/genres_original/blues/blues.00009.wav  
  inflating: Data/genres_original/blues/blues.00010.wav  
  inflating: Data/genres_original/blues/blues.00011.wav  
  inflating: Data/g

In [None]:
import os
import librosa
import numpy as np

data_path = "Data/genres_original/classical"  # Path to your "Data" folder
SR = 3000
def load_raw_audio(data_path):
    wav_files = []
    for filename in os.listdir(data_path):
        if filename.endswith(".wav"):
            filepath = os.path.join(data_path, filename)
            data, sr = librosa.load(filepath, sr=SR)  # Load with original sampling rate
            wav_files.append(data)
    return wav_files

# Load raw audio data
data = (load_raw_audio(data_path))


In [None]:

from torch.utils.data import Dataset, DataLoader
class AudioDataset(Dataset):
    def __init__(self, data, length=90000):
        self.data = data
        self.length = length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Assuming that each row of data is an individual sample
        sample = torch.tensor(self.data[idx], dtype=torch.float32)  # Convert to PyTorch tensor
        # Truncate or pad the sample to the desired length
        if len(sample) > self.length:
            sample = sample[:self.length]
        elif len(sample) < self.length:
            padding = torch.zeros(self.length - len(sample))
            sample = torch.cat((sample, padding))
        # Add a channel dimension: (1, length)
        sample = sample.unsqueeze(0)
        return sample

from sklearn.model_selection import train_test_split

train_data , test_data = train_test_split(data, test_size=0.2, random_state=42)
# Instantiate the dataset
train_dataset = AudioDataset(train_data)
test_dataset = AudioDataset(test_data)

# Create a DataLoader
def load_data(dataset, batch_size=1, shuffle=True):
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

# Now you can use the DataLoader in your training loop
train_loader = load_data(train_dataset, batch_size=10, shuffle=True)  # Example batch size
test_loader = load_data(test_dataset, batch_size=10, shuffle=False)  # Example batch size


In [None]:
import torch

def get_output(data, model):
    model.eval()  # Set the model to evaluation mode
    all_outputs = []
    all_sample = []
    with torch.no_grad():
        for inputs in data:
            outputs = model(inputs)
            all_outputs.append(outputs)
            all_sample.append(inputs)
    return torch.cat(all_outputs, dim=0) , torch.cat(all_sample,dim=0)  # Concatenate outputs along the first dimension



In [None]:
train_outputs ,train_sample = get_output(train_loader, S4D_model)
test_outputs,test_sample = get_output(test_loader, S4D_model)


In [None]:
import numpy as np
from numpy import cov
from numpy import trace
from numpy import iscomplexobj
from scipy.linalg import sqrtm
from keras.models import Model

# calculate frechet inception distance using encoder of VAE model

def calculate_fid(encoder, audio1, audio2): #( inception model , audio1 , audio2 )
    # encode audio using the encoder
    enc1 = encoder.predict(audio1) # (batch , laten space of audio 1 )

    enc2 = encoder.predict(audio2) # (batch , laten space of audio 1 )

    # calculate mean and covariance statistics
    mu1, sigma1 = enc1.mean(axis=0), cov(enc1, rowvar=False)
    mu2, sigma2 = enc2.mean(axis=0), cov(enc2, rowvar=False)

    # calculate sum squared difference between means
    ssdiff = np.sum((mu1 - mu2)**2.0)

    # calculate sqrt of product between cov
    covmean = sqrtm(sigma1.dot(sigma2))

    # check and correct imaginary numbers from sqrt
    if iscomplexobj(covmean):
        covmean = covmean.real

    # calculate score
    fid = ssdiff + trace(sigma1 + sigma2 - 2.0 * covmean)
    return fid

In [None]:
### Test
calculate_fid(inception , train_outputs.numpy()[:20], train_outputs.numpy()[:20])



-2.1853356702147483e-08

In [None]:
calculate_fid(inception , train_outputs.numpy()[:20], train_sample.numpy()[:20])



5.130450216982433

In [None]:
calculate_fid(inception , test_outputs.numpy(), test_sample.numpy())



20.750793210066846