### Dataset: https://www.idmt.fraunhofer.de/en/publications/datasets/audio_effects.html

In [1]:
import numpy as np
import matplotlib.pyplot as plt

import keras
import tensorflow as tf

from tensorflow.keras import backend as K
from tensorflow.keras.layers import Lambda

from keras.models import Model
from keras.layers import Input, Conv1D, Dense, Activation, Concatenate, TimeDistributed, Lambda, Reshape
from keras.layers import Multiply, Add, UpSampling1D, MaxPooling1D, Bidirectional, LSTM, GlobalAvgPool1D
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.python.keras.utils import conv_utils
from keras.callbacks import LambdaCallback


import Utils as utils
from keras.utils import Sequence

#from Layers import Generator
import Models

from Layers import Conv1D_local, Dense_local, SAAF, Conv1D_tied, Slice

import random
import librosa
import scipy
import soundfile as sf
import json

random.seed(4264523625)

import brian2
from brian2hears import erbspace, Gammatone, Sound
from brian2 import Hz
from scipy.signal import hilbert
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.metrics.pairwise import paired_distances
from collections import OrderedDict
import sys
import os
import wave

from IPython.display import Audio

from scipy.io import wavfile
import resampy
import ctypes

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*hann from 'scipy.signal' is deprecated.*")


In [2]:
# Prevent sleep
ES_CONTINUOUS = 0x80000000
ES_SYSTEM_REQUIRED = 0x00000001
ctypes.windll.kernel32.SetThreadExecutionState(ES_CONTINUOUS | ES_SYSTEM_REQUIRED)

-2147483648

In [None]:
#pip install resampy

In [3]:
kSR = 16000
kContext = 4
epoch =  4
filters = 128
kernelSize = 64
learningRate = 0.0001
winLength = 4096
modelsPath = "./Models/"
monitorLoss =  "val_loss"


In [4]:
def BooleanMask(x):
    output = tf.cast(tf.greater_equal(x[0], x[1]), dtype=tf.float32)
    output = tf.multiply(output, x[1])
    return output

def toPermuteDimensions(x):
    return K.permute_dimensions(x, (0, 2, 1))

def absolute_activation(x):
    return tf.abs(x)

In [5]:
def overlap(x, x_len, win_length, hop_length, windowing=True, rate=1):
    x = x.reshape(x.shape[0], x.shape[1]).T
    
    if windowing:
        window = scipy.signal.hann(win_length, sym=False)
        #window = scipy.signal.get_window(('hann', False), win_length)
        rate = rate * hop_length / win_length
    else:
        window = 1
        rate = 1
    n_frames = x_len / hop_length
    expected_signal_len = int(win_length + hop_length * (n_frames))
    y = np.zeros(expected_signal_len)
    for i in range(int(n_frames)):
        sample = i * hop_length
        w = x[:, i]
        y[sample:(sample + win_length)] = y[sample:(sample + win_length)] + w * window
    y = y[int(win_length // 2):-int(win_length // 2)]
    return np.float32(y * rate)


In [6]:
kGfmin, kGfmax, kGbands = 26, 6950, 12
kMfmin, kMfmax, kMbands = 0.5, 100, 12
kEsr = 400

cfG = erbspace(kGfmin*Hz, kGfmax*Hz, kGbands)
cfM = erbspace(kMfmin*Hz, kMfmax*Hz, kMbands)

kEfmin = cfM[:-1]
kEfmax = cfM[1:]

def getGammatone(x, fmin, fmax, bands, sr):
    cf = erbspace(fmin*Hz, fmax*Hz, bands)
    gfb = Gammatone(Sound(x, samplerate=sr*Hz), cf)
    gamma = gfb.process()
    return gamma

def getFFT(x):
    n = len(x) # length of the signal
    x = x*scipy.signal.hann(n, sym=False)
    XD = np.fft.fft(x)/n # fft computing and normalization
    XD = XD[range(n//2)]
    
    return XD

def getModulation(x, fmin, fmax, bands, sr):
    cf = erbspace(fmin*Hz, fmax*Hz, bands)
    m = []
    for i in range(x.shape[1]):
        gfb = Gammatone(Sound(x[:,i], samplerate=sr*Hz), cf)
        m.append(gfb.process())
    return np.asarray(m)

def getEnvelope(x, power = True, downsample = None):
    
    envs = []
    for i in range(x.shape[1]):
        analytic_signal = hilbert(x[:,i])
        amplitude_envelope = (np.abs(analytic_signal))
        if power:
            amplitude_envelope = np.power(amplitude_envelope, 0.3)
        if downsample:
            amplitude_envelope = scipy.signal.resample(amplitude_envelope,
                                                       len(amplitude_envelope)//(downsample[0]//downsample[1]))
        envs.append(amplitude_envelope)
    envs = np.asarray(envs)
    
    return envs.T

def getMarginal(x):
    
    stats = OrderedDict()
    marginal = scipy.stats.describe(x)
    stats['mean'] = marginal[2]
    stats['var'] = marginal[3]/(marginal[2]**2)
    stats['skew'] = marginal[4]
    stats['kurtosis'] = marginal[5]
    return stats

def plotSubBands(X, sr):
    plt.figure(figsize=(18, 8))
    t = np.linspace(0, len(X[:,0])/sr, num=len(X[:,0]))
    for i in range(X.shape[1]):
        plf.plot(t, X[:,i])
        

def getModulationPower(x_m, x_e):
    m = []
    for i in range(kGbands):
        a = scipy.stats.describe(x_m[i]**2)
        m.append(a[2])
    return np.asarray(m) 

def getModulationSpectrum(x):
    x_mD = []
    for j in range(x.shape[0]):
        ffts = []
        for i in range(x.shape[2]):
            XD = getFFT(x[j,:,i])
            ffts.append(abs(XD))
        x_mD.append(ffts)
    x_mD = np.asarray(x_mD)
    return x_mD

def getModulationSpectrumEnergy(x, f1, f2, sr, normalizeDC = True):
    
    #returns modulation spectrum and modulation energy within specifc frequency bands
    energy = np.mean(x, axis = 1)**2
    if normalizeDC:
        energy = np.mean(energy, axis = 0)/np.mean(energy, axis = 0)[0]
    else:
        energy = np.mean(energy, axis = 0)
    
    energyBands = []
    for fmin, fmax in zip(f1, f2):
        
        bin1 = int(np.round(energy.shape[0]*fmin/(sr/2)))
        bin2 = int(np.round(energy.shape[0]*fmax/(sr/2)))
        energyBands.append(np.sum(energy[bin1:bin2+1]))
    
    modulationspectrum = np.mean(x, axis = 1)
    modulationspectrum = np.mean(modulationspectrum, axis = 0)
        
    return modulationspectrum, np.asarray(energyBands)

def getMeanLogModulationSpectrum(x):
    
    x = np.mean(x, axis=0)
    x = np.mean(x, axis=0)
    x = np.log(x + 1e-10)
    return x

def getMP(audio, kLen):

    x_modulationEnergyBands = []

   
    x_g = getGammatone(audio[:kLen], kGfmin, kGfmax, kGbands, kSR)
    x_ge = getEnvelope(x_g, downsample = ((kSR, kEsr)), power = False)
    x_gem = getModulation(x_ge, kMfmin, kMfmax, kMbands, kEsr)
    x_ge_stats = getMarginal(x_ge)
    m_x = getModulationPower(x_gem, x_ge_stats)
    x_gemD = getModulationSpectrum(x_gem)
    x_Em, x_Ebm = getModulationSpectrumEnergy(x_gemD, kEfmin, kEfmax, kEsr, normalizeDC = True)
    x_em_stats = getMarginal(x_Em[1:])
    x_gemD_meanlog = getMeanLogModulationSpectrum(x_gemD)
    
    
    return x_gemD_meanlog, x_Ebm, x_em_stats.values()

In [7]:
os.chdir("C:/Users/yavuz/Musiccodes/MyCAFModels/data/")

nofx_2_array = np.load("grandhall_stereo_data_right.npy")
print("Training Array Shape: ", nofx_2_array.shape)

nofx_2_array_test = np.load("grandhall_stereo_data_right.npy")
print("Test Array Shape", nofx_2_array_test.shape)

reverb_2_array = np.load("grandhall_atmos_data_center.npy")
print("Output Array Shape", reverb_2_array.shape)


Training Array Shape:  (104, 192000)
Test Array Shape (104, 192000)
Output Array Shape (104, 192000)


In [None]:
plt.figure(figsize=(12,4))
plt.plot(reverb_2_array[5] - nofx_2_array[5]);

In [8]:
nofx_2_array_16 = np.zeros((104, 64000), dtype=np.float32)
reverb_2_array_16 = np.zeros((104, 64000), dtype=np.float32)
nofx_2_array_test_16 = np.zeros((104, 64000), dtype=np.float32)

for i in range(104):
    if (i%20==0):
        print(i)
    nofx_2_array_test_16[i,:] = resampy.resample(nofx_2_array_test[i,:], sr_orig=48000, sr_new=16000)
    nofx_2_array_16[i,:] = resampy.resample(nofx_2_array[i,:], sr_orig=48000, sr_new=16000)
    reverb_2_array_16[i,:] = resampy.resample(reverb_2_array[i,:], sr_orig=48000, sr_new=16000)
    
print(nofx_2_array_test_16.shape, nofx_2_array_16.shape,reverb_2_array_16.shape)

0
20
40
60
80
100
(104, 64000) (104, 64000) (104, 64000)


In [None]:
plt.figure(figsize=(12,4))
plt.plot(reverb_2_array_16[5] - nofx_2_array_16[5]);

In [9]:
nofx_2_array_test = nofx_2_array_test_16
nofx_2_array = nofx_2_array_16
reverb_2_array = reverb_2_array_16

print(nofx_2_array_test.shape, nofx_2_array.shape, reverb_2_array.shape)

(104, 64000) (104, 64000) (104, 64000)


In [None]:
plt.figure(figsize=(12,4))
plt.plot(reverb_2_array[5] - nofx_2_array[5]);

In [10]:
data_length_include = 104

nofx_2_array = nofx_2_array[0:data_length_include,:]
print("Train", nofx_2_array.shape)

reverb_2_array = reverb_2_array[0:data_length_include,:]
print("Output", reverb_2_array.shape)

nofx_2_array_test = nofx_2_array_test[0:data_length_include,:]
print("Test", nofx_2_array_test.shape)

length_in_samples = nofx_2_array.shape[1]

print("Length in samples", length_in_samples)


Train (104, 64000)
Output (104, 64000)
Test (104, 64000)
Length in samples 64000


In [11]:
#nofx_2_array_reshaped = nofx_2_array.reshape(data_length_include, 32001, 1)
#reverb_2_array_reshaped = reverb_2_array.reshape(data_length_include, 32001, 1)

nofx_2_array_reshaped = nofx_2_array.reshape(data_length_include, length_in_samples, 1)
nofx_2_array_test_reshaped = nofx_2_array_test.reshape(data_length_include, length_in_samples, 1)
reverb_2_array_reshaped = reverb_2_array.reshape(data_length_include, length_in_samples, 1)

In [None]:
id_x = 12
plt.figure(figsize=(12,4))
plt.plot(nofx_2_array[id_x]);
plt.title("Train")
Audio(data=nofx_2_array[id_x], rate=16000)

In [None]:
plt.figure(figsize=(12,4))
plt.plot(reverb_2_array[id_x]);
plt.title("Output")
Audio(data=reverb_2_array[id_x], rate=16000)

In [None]:
id_x = 12
plt.figure(figsize=(12,4))
plt.plot(reverb_2_array[id_x] - nofx_2_array[id_x]);

In [12]:
class Generator(Sequence):

    def __init__(self, x_set, y_set, win_length, hop_length, win = False):
        self.x, self.y = x_set, y_set
        self.win_length = win_length
        self.hop_length = hop_length
        self.batch_size = int(self.x.shape[1] / self.hop_length) + 1
        self.win = win

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):

        batch_x = np.zeros((self.batch_size, self.win_length, 1))

        batch_y = np.zeros((self.batch_size, self.win_length, 1))


        x_w = self.x[idx].reshape(len(self.x[idx]))
        y_w = self.y[idx].reshape(len(self.y[idx]))


        x_w = utils.slicing(x_w, self.win_length, self.hop_length, windowing = self.win)
        y_w = utils.slicing(y_w, self.win_length, self.hop_length, windowing = self.win)

        for i in range(self.batch_size):

            batch_x[i] = x_w[i].reshape(self.win_length ,1)
            batch_y[i] = y_w[i].reshape(self.win_length ,1)


        return batch_x, batch_y

In [13]:
train_samples = int(len(nofx_2_array)*0.8)
print("Train samples", train_samples)


Xtrain = nofx_2_array_reshaped[0:train_samples,1:,:]
#Ytrain = chorus_array_reshaped[0:62,1:,:]
Ytrain = reverb_2_array_reshaped[0:train_samples,1:,:]
Xval = nofx_2_array_reshaped[train_samples:,1:,:]
#Yval = chorus_array_reshaped[62:,1:,:]
Yval = reverb_2_array_reshaped[train_samples:,1:,:]

print("Xtrain.shape", Xtrain.shape, Ytrain.shape, Xval.shape, Yval.shape)

# since the samples are 2 secs long, we zero pad kContext*hop_size samples at the end of the recording. This for the 4 
# subsequent frames in the Leslie modeling tasks.
Xtrain = utils.cropAndPad(Xtrain, crop = 0, pad = kContext*winLength//2)
Ytrain = utils.cropAndPad(Ytrain, crop = 0, pad = kContext*winLength//2)
Xval = utils.cropAndPad(Xval, crop = 0, pad = kContext*winLength//2)
Yval = utils.cropAndPad(Yval, crop = 0, pad = kContext*winLength//2)

print("Xtrain.shape after CropandPad", Xtrain.shape, Ytrain.shape, Xval.shape, Yval.shape)

Xtrain_pre = np.vstack((Xtrain, Ytrain))
Xval_pre = np.vstack((Xval, Yval))

Train samples 83
Xtrain.shape (83, 63999, 1) (83, 63999, 1) (21, 63999, 1) (21, 63999, 1)
Xtrain.shape after CropandPad (83, 72191, 1) (83, 72191, 1) (21, 72191, 1) (21, 72191, 1)


In [14]:
trainGen_pre = Generator(Xtrain_pre, Xtrain_pre, winLength, winLength//2)
valGen_pre = Generator(Xval_pre, Xval_pre, winLength, winLength//2)   

trainGen = Generator(Xtrain, Ytrain, winLength, winLength//2)
valGen = Generator(Xval, Yval, winLength, winLength//2) 

In [15]:
modelName = "Model_CWAFx"

earlyStopping_pre = Models.EarlyStopping(monitor='loss',
                                          min_delta=0,
                                          patience=25,
                                          verbose=1,
                                          mode='auto',
                                          baseline=None, 
                                          restore_best_weights=False)


checkpointer_pre = Models.ModelCheckpoint(filepath=modelsPath + modelName + "_chk.weights.h5",
                                           monitor=monitorLoss,
                                           verbose=1,
                                           save_best_only=True,
                                           save_weights_only=True)  

earlyStopping = Models.EarlyStopping(monitor='loss',
                                          min_delta=0,
                                          patience=25,
                                          verbose=1,
                                          mode='auto',
                                          baseline=None, restore_best_weights=False)

checkpointer = Models.ModelCheckpoint(filepath=modelsPath + modelName +".weights.h5",
                                           monitor=monitorLoss,
                                           verbose=1,
                                           save_best_only=True,
                                           save_weights_only=True)  
            

In [16]:
pwd

'C:\\Users\\yavuz\\Musiccodes\\MyCAFModels\\data'

In [None]:
print("Xtrain Shape", Xtrain.shape, "Ytrain Shape", Ytrain.shape, "Xval Shape", Xval.shape, "Yval Shape", Yval.shape)

In [17]:
def pretrainingModel(win_length, filters, kernel_size_1, learning_rate):

    x = Input(shape=(win_length, 1), name='input')
    print("PRETRAINING")
    #print(x)
    conv = Conv1D(filters, kernel_size_1, strides=1, padding='same',
                  kernel_initializer='lecun_uniform',
                  input_shape=(win_length, 1), name='conv')

    conv_smoothing = Conv1D_local(filters, kernel_size_1 * 2, strides=1, padding='same',
                                  kernel_initializer='lecun_uniform', name='conv_smoothing')

    deconv = Conv1D_tied(1, kernel_size_1, conv, padding='same', name='deconv')

    X = conv(x)
    #print("X shape", X.shape)
    #X_abs = Activation(K.abs, name='conv_activation')(X)
    #X_abs = Lambda(lambda x: K.abs(x), name='conv_activation')(X)
    #X_abs = Activation(absolute_activation, name='conv_activation')(X)
    #X_abs = Lambda(lambda x: K.abs(x), name='conv_activation')(X)

    #X_abs = Lambda(lambda x: K.abs(x), name='conv_activation', output_shape=lambda s:s)(X)
    #X_abs = Lambda(lambda x: Activation(x), name='conv_activation', output_shape=lambda s:s)(X)
    X_abs = Lambda(lambda x: tf.abs(x), name='conv_activation')(X)

    #X_abs = Activation(absolute_activation, name='conv_activation')(X)
    
    M = conv_smoothing(X_abs)
    M = Activation('softplus', name='conv_smoothing_activation')(M)
    print("M Shape", M.shape)

    P = X
    Z = MaxPooling1D(pool_size=win_length // 64, name='max_pooling')(M)
    M_ = UpSampling1D(size=win_length // 64, name='up_sampling_naive')(Z)

    #print("M_ Shape", M_.shape)
    #M_ = Lambda((BooleanMask), name='boolean_mask')([M, M_])

    M_ = Lambda(BooleanMask, name='boolean_mask', output_shape=lambda s: s[0])([M, M_])
    

    Y = Multiply(name='phase_unpool_multiplication')([P, M_])
    Y = deconv(Y)

    model = Model(inputs=[x], outputs=[Y])

    model.compile(loss={'deconv': 'mae'},
                  loss_weights={'deconv': 1.0},
                  optimizer=Adam(learning_rate=learning_rate))

    return model


In [None]:
def pretrainingModel(win_length, filters, kernel_size_1, learning_rate):

    x = Input(shape=(win_length, 1), name='input')
    conv = Conv1D(filters, kernel_size_1, strides=1, padding='same',
                  kernel_initializer='lecun_uniform',
                  input_shape=(win_length, 1), name='conv')

    conv_smoothing = Conv1D_local(filters, kernel_size_1 * 2, strides=1, padding='same',
                                  kernel_initializer='lecun_uniform', name='conv_smoothing')

    deconv = Conv1D_tied(1, kernel_size_1, conv, padding='same', name='deconv')

    X = conv(x)

    X_abs = Lambda(lambda x: tf.abs(x), name='conv_activation')(X)

    M = conv_smoothing(X_abs)
    M = Activation('softplus', name='conv_smoothing_activation')(M)
    print("M Shape", M.shape)

    P = X
    Z = MaxPooling1D(pool_size=win_length // 64, name='max_pooling')(M)
    M_ = UpSampling1D(size=win_length // 64, name='up_sampling_naive')(Z)


    M_ = Lambda(BooleanMask, name='boolean_mask', output_shape=lambda s: s[0])([M, M_])
    

    Y = Multiply(name='phase_unpool_multiplication')([P, M_])
    Y = deconv(Y)

    model = Model(inputs=[x], outputs=[Y])

    model.compile(loss={'deconv': 'mae'},
                  loss_weights={'deconv': 1.0},
                  optimizer=Adam(learning_rate=learning_rate))

    return model

In [18]:
model_CWAFx_pretraining = pretrainingModel(winLength,
                                    filters, 
                                    kernelSize, 
                                    learningRate);

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


PRETRAINING

M Shape (None, 4096, 128)


In [None]:
model_CWAFx_pretraining = pretrainingModel(winLength,
                                    filters, 
                                    kernelSize, 
                                    learningRate);

In [19]:
model_CWAFx_pretraining.fit(trainGen_pre,
                           steps_per_epoch=None,
                           epochs=2,
                           verbose=1,
                           callbacks = [checkpointer_pre, earlyStopping_pre],
                           validation_data = valGen_pre,
                           validation_steps=len(Xval),
                           shuffle=True);

  self._warn_if_super_not_called()


Epoch 1/2




[1m166/166[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - loss: 0.0046

  self._warn_if_super_not_called()



Epoch 1: val_loss improved from inf to 0.00226, saving model to ./Models/Model_CWAFx_chk.weights.h5
[1m166/166[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m448s[0m 3s/step - loss: 0.0046 - val_loss: 0.0023
Epoch 2/2
[1m166/166[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - loss: 0.0035
Epoch 2: val_loss did not improve from 0.00226
[1m166/166[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m492s[0m 3s/step - loss: 0.0035 - val_loss: 0.0050


In [None]:
model.fit(trainGen_pre,
          steps_per_epoch=None,
                           epochs=2,
                           verbose=2,
                           callbacks = [checkpointer_pre, earlyStopping_pre],
                           validation_data = valGen_pre,
                           validation_steps=len(Xval),
                           shuffle=True);

In [None]:
model_CWAFx_pretraining.summary()


In [20]:
class GeneratorContext(Sequence):

    def __init__(self, x_set, y_set, context, win_length, hop_length, win = False, win_input = None):
        self.x, self.y = x_set, y_set
        self.win_length = win_length
        self.hop_length = hop_length
        self.batch_size = int(self.x.shape[1] / self.hop_length) + 1
        self.win_output = win
        if win_input == None:
            self.win_input = win
        else:
            self.win_input = win_input
        self.context = context

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        
        batch_x = []
        for i in range(self.context*2+1):
            batch_x.append(np.zeros((self.batch_size, self.win_length, 1)))
        batch_y = np.zeros((self.batch_size, self.win_length, 1))
        
        
        x_w = self.x[idx].reshape(len(self.x[idx]))
        y_w = self.y[idx].reshape(len(self.y[idx]))

        
        x_w = utils.slicing(x_w, self.win_length, self.hop_length, windowing = self.win_input)

        x_w = np.pad(x_w, ((self.context, self.context),(0, 0)), 'constant', constant_values=(0))
        a = []
        for i in range(x_w.shape[0]):
            a.append(x_w[i:i+self.context*2+1])
        del a[-self.context*2:]
        a = np.asarray(a)
       
        y_w = utils.slicing(y_w, self.win_length, self.hop_length, windowing = self.win_output)
        
        for i in range(self.batch_size):
            
            for j in range(self.context*2+1):
                batch_x[j][i] = a[:,j,:][i].reshape(self.win_length,1)
                       
            batch_y[i] = y_w[i].reshape(self.win_length,1) 
            
        batch_x = np.swapaxes(np.asarray(batch_x), 0, 1)
        
        return batch_x, batch_y  

In [21]:
def se_block(x, num_features, weight_decay=0., amplifying_ratio=16, idx = 1):
    x = Multiply(name='dnn-saaf-se_%s'%idx)([x, se_fn(x, amplifying_ratio, idx)])
    return x
def se_fn(x, amplifying_ratio, idx):
    #num_features = x.shape[-1].value
    num_features = x.shape[-1]
    x = Activation(K.abs)(x)
    x = GlobalAvgPool1D()(x)
    x = Reshape((1, num_features))(x)
    x = Dense(num_features * amplifying_ratio, activation='relu', kernel_initializer='glorot_uniform',
              name='se_dense1_%s'%idx)(x)
    x = Dense(num_features, activation='sigmoid', kernel_initializer='glorot_uniform',
              name='se_dense2_%s'%idx)(x)
    return x

## CWAFx and Wavenet

In [22]:
def dilated_residual_block(data_x, res_block_i, layer_i, dilation, stack_i, config,
                           num_residual_blocks, input_length, samples_of_interest_indices,
                           padded_target_field_length, context=True):
    
    

    original_x = data_x
    bias = True

    # Data sub-block
    data_out = keras.layers.Conv1D(2 * config['model']['filters']['depths']['res'],
                                                config['model']['filters']['lengths']['res'],
                                                dilation_rate=dilation, padding='same',
                                                use_bias=bias,
                                                name='res_%d_dilated_conv_d%d_s%d' % (
                                                res_block_i, dilation, stack_i),
                                                activation=None)(data_x)
    
    data_out_1 = Slice(
            (Ellipsis, slice(0, config['model']['filters']['depths']['res'])),
            (input_length, config['model']['filters']['depths']['res']),
            name='res_%d_data_slice_1_d%d_s%d' % (num_residual_blocks, dilation, stack_i))(data_out)
    
    data_out_2 = Slice(
            (Ellipsis, slice(config['model']['filters']['depths']['res'],
                             2 * config['model']['filters']['depths']['res'])),
            (input_length, config['model']['filters']['depths']['res']),
            name='res_%d_data_slice_2_d%d_s%d' % (num_residual_blocks, dilation, stack_i))(data_out)
   
   
    
    tanh_out = keras.layers.Activation('tanh')(data_out_1)
    sigm_out = keras.layers.Activation('sigmoid')(data_out_2)
    
    data_x = keras.layers.Multiply(name='res_%d_gated_activation_%d_s%d'
                                   % (res_block_i, layer_i, stack_i))([tanh_out, sigm_out])

    data_x = keras.layers.Conv1D(config['model']['filters']['depths']['res']
                                 + config['model']['filters']['depths']['skip'],
                                1,
                                padding='same',
                                use_bias = bias,
                                name='res_%d_conv_d%d_s%d' % (
                                res_block_i, dilation, stack_i))(data_x)
    
    res_x = Slice((Ellipsis, slice(0, config['model']['filters']['depths']['res'])),
                             (input_length, config['model']['filters']['depths']['res']),
                         name='res_%d_data_slice_3_d%d_s%d' % (res_block_i, dilation, stack_i))(data_x)
    
    skip_x = Slice((Ellipsis, slice(config['model']['filters']['depths']['res'],
                                               config['model']['filters']['depths']['res'] +
                                               config['model']['filters']['depths']['skip'])),
                              (input_length, config['model']['filters']['depths']['skip']),
                          name='res_%d_data_slice_4_d%d_s%d' % (res_block_i, dilation, stack_i))(data_x)
    
    if context == False:
        samples_of_interest_indices[0] = 0
        samples_of_interest_indices[-1] = config['model']['input_length']#k['win_length']
    
    skip_x = Slice((slice(samples_of_interest_indices[0], samples_of_interest_indices[-1], 1),
                               Ellipsis),
                   (padded_target_field_length, config['model']['filters']['depths']['skip']),
                   name='res_%d_keep_samples_of_interest_d%d_s%d' % (res_block_i, dilation, stack_i))(skip_x)
    
    res_x = keras.layers.Add()([original_x, res_x])
    

    return res_x, skip_x 


In [23]:
def wavenet(data_input, config, contextFrames=0, output_channels=1, context=True):

    

    num_residual_blocks = len(config['model']['dilations']) * config['model']['num_stacks']
    
    input_length = config['model']['input_length']
    if input_length == None:
        input_length = config['model']['target_field_length']*(contextFrames + 1)
    
    target_field_length = config['model']['target_field_length']
    half_target_field_length = target_field_length // 2
    target_padding = config['model']['target_padding']
    target_sample_index = int(np.floor(input_length / 2.0))
    samples_of_interest_indices = range(target_sample_index - half_target_field_length - target_padding,
                                        target_sample_index + half_target_field_length + target_padding + 1)
    padded_target_field_length = target_field_length + 2 * target_padding
   
    
    data_out = keras.layers.Conv1D(config['model']['filters']['depths']['res'],
                                                  config['model']['filters']['lengths']['res'], padding='same',
                                                  use_bias=True, name='initial_causal_conv')(data_input)
    
    skip_connections = []
    res_block_i = 0
    for stack_i in range(config['model']['num_stacks']):
        layer_in_stack = 0
        for dilation in config['model']['dilations']:
            res_block_i += 1
            data_out, skip_out = dilated_residual_block(data_out,
                                                        res_block_i,
                                                        layer_in_stack,
                                                        dilation,
                                                        stack_i,
                                                        config, 
                                                        num_residual_blocks, 
                                                        input_length, 
                                                        samples_of_interest_indices, 
                                                        padded_target_field_length,
                                                       context=context)
            if skip_out is not None:
                skip_connections.append(skip_out)
            layer_in_stack += 1

    skip_connections = keras.layers.Lambda(lambda inputs: tf.convert_to_tensor(inputs))(skip_connections)        

    skip_connections = keras.layers.Lambda(lambda inputs: tf.keras.backend.sum(inputs,
                                                                               axis=0,
                                                                               keepdims=False))(skip_connections)   

    data_out = keras.layers.Activation('relu')(skip_connections)

    data_out = keras.layers.Conv1D(config['model']['filters']['depths']['final'][0],
                                          config['model']['filters']['lengths']['final'][0], padding='same',
                                                  use_bias=True, name='penultimate_conv_1d')(data_out)

    
    data_out = keras.layers.Activation('relu')(data_out)

    data_out = keras.layers.Conv1D(config['model']['filters']['depths']['final'][1],
                                          config['model']['filters']['lengths']['final'][1], padding='same',
                                                  use_bias=True, name='final_conv_1d')(data_out)


    
    
    return data_out



In [24]:
def WaveNet(learning_rate, wavenetConfig):
    
    data_input = Input(shape=(wavenetConfig['model']['input_length'], 1), name='data_input')

    
    data_out = wavenet(data_input, wavenetConfig)
    
    data_out = keras.layers.Conv1D(1, 1, name='conv1d_1')(data_out)
    
    model = Model(inputs=[data_input], outputs=[data_out])     
    
 
        
    model.compile(loss='mae',
                  optimizer=Adam(learning_rate=learning_rate))
    
    
    
    return model



In [25]:
def CWAFx(win_length, filters, kernel_size_1, learning_rate, wavenetConfig):
    
    kContext = 4 # past and subsequent frames
    
    x = Input(shape=(kContext*2+1, win_length, 1), name='input')
    
    conv = Conv1D(filters, kernel_size_1, strides=1, padding='same',
                       kernel_initializer='lecun_uniform', input_shape=(win_length, 1))
    
    activation_abs = Activation(K.abs)
    activation_sp = Activation('softplus')
    max_pooling = MaxPooling1D(pool_size=win_length//64)

    conv_smoothing = Conv1D_local(filters, kernel_size_1*2, strides=1, padding='same',
                                  kernel_initializer='lecun_uniform')
    
   
    deconv = Conv1D_tied(1, kernel_size_1, conv, padding='same', name='deconv')
     
        
    X = TimeDistributed(conv, name='conv')(x)
    X_abs = TimeDistributed(activation_abs, name='conv_activation')(X)
    M = TimeDistributed(conv_smoothing, name='conv_smoothing')(X_abs)
    M = TimeDistributed(activation_sp, name='conv_smoothing_activation')(M)
    P = X
    Z = TimeDistributed(max_pooling, name='max_pooling')(M)
    Z = Lambda(lambda inputs: tf.unstack(inputs, num=kContext*2+1, axis=1, name='unstack2'))(Z)
    Z = Concatenate(name='concatenate', axis=-2)(Z)
    
    Z = wavenet(Z, wavenetConfig, contextFrames=kContext, output_channels=filters, context=True)
  
    Z = Lambda((toPermuteDimensions), name='perm_1')(Z)
    Z = Dense(win_length//64, activation = 'tanh', name = 'dense_wn')(Z)
    Z = Lambda((toPermuteDimensions), name='perm_2')(Z)
    
    M_ = UpSampling1D(size=win_length//64, name='up_sampling_naive')(Z)
    P = Lambda(lambda inputs: tf.unstack(inputs, num=kContext*2+1, axis=1, name='unstack'))(P)
    Y = Multiply(name='phase_unpool_multiplication')([P[kContext],M_])

    Y_ = Dense(filters, activation = 'tanh', name = 'dense_in')(Y)
    Y_ = Dense(filters//2, activation = 'tanh', name = 'dense_h1')(Y_)   
    Y_ = Dense(filters//2, activation = 'tanh', name = 'dense_h2')(Y_)
    Y_ = Dense(filters, activation = 'linear', name = 'dense_out')(Y_)
 
    Y_ = SAAF(break_points=25, break_range=0.2, magnitude=100, order=2, tied_feamap=True,
            kernel_initializer = 'random_normal', name = 'saaf_out')(Y_)
    
    Y_ = se_block(Y_, filters, weight_decay=0., amplifying_ratio=16, idx = 1)
    
    Y = Add(name='addition')([Y,Y_])
    
    Y = deconv(Y)
    
    model = Model(inputs=[x], outputs=[Y])
    
    model.compile(loss={'deconv': 'mae'},
                        loss_weights={'deconv': 1.0},
                        optimizer=Adam(learning_rate=learning_rate))
    

    return model




In [26]:
CWAFx_config = { 'epoch' : 2000,
                    'filters' : 32,
                    'kernelSize' : 64,
                    'learningRate' : 0.0001,
                    'winLength' : 4096,
                    'modelsPath': './Models/',
                    'monitorLoss': 'val_loss',
                    'wavenetConfig': {
                        'model': {
                        'dilations': [1, 2, 4, 8, 16, 32, 64],
                        'filters': {'depths': {'final': [32, 32],
                        'res': 32,
                        'skip': 32},
                        'lengths': {'final': [3, 3], 'res': 3, 'skip': 1}},
                        'input_length': 576,
                        'num_stacks': 2,
                        'target_field_length': 576,
                        'target_padding': 0}}
               }

In [27]:
trainGen = GeneratorContext(Xtrain, Ytrain, kContext, CWAFx_config['winLength'], CWAFx_config['winLength']//2)
valGen = GeneratorContext(Xval, Yval, kContext, CWAFx_config['winLength'], CWAFx_config['winLength']//2)

In [28]:
model_CWAFx = CWAFx(CWAFx_config['winLength'], 
                    CWAFx_config['filters'],  
                    CWAFx_config['kernelSize'], 
                    CWAFx_config['learningRate'], 
                    CWAFx_config['wavenetConfig'])

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [None]:
model_CWAFx.summary()

In [29]:
os.chdir('C:/Users/yavuz/Musiccodes/MyCAFModels/Models/')
model_CWAFx.load_weights('Model_CWAFx_chk.weights.h5');

  saveable.load_own_variables(weights_store.get(inner_path))


ValueError: A total of 41 objects could not be loaded. Example error message for object <Conv1D name=conv1d, built=True>:

Layer 'conv1d' expected 2 variables, but received 0 variables during loading. Expected: ['kernel', 'bias']

List of objects that could not be loaded:
[<Conv1D name=conv1d, built=True>, <Conv1D_local name=conv1d_local, built=True>, <Conv1D name=initial_causal_conv, built=True>, <Conv1D name=res_1_dilated_conv_d1_s0, built=True>, <Conv1D name=res_1_conv_d1_s0, built=True>, <Conv1D name=res_2_dilated_conv_d2_s0, built=True>, <Conv1D name=res_2_conv_d2_s0, built=True>, <Conv1D name=res_3_dilated_conv_d4_s0, built=True>, <Conv1D name=res_3_conv_d4_s0, built=True>, <Conv1D name=res_4_dilated_conv_d8_s0, built=True>, <Conv1D name=res_4_conv_d8_s0, built=True>, <Conv1D name=res_5_dilated_conv_d16_s0, built=True>, <Conv1D name=res_5_conv_d16_s0, built=True>, <Conv1D name=res_6_dilated_conv_d32_s0, built=True>, <Conv1D name=res_6_conv_d32_s0, built=True>, <Conv1D name=res_7_dilated_conv_d64_s0, built=True>, <Conv1D name=res_7_conv_d64_s0, built=True>, <Conv1D name=res_8_dilated_conv_d1_s1, built=True>, <Conv1D name=res_8_conv_d1_s1, built=True>, <Conv1D name=res_9_dilated_conv_d2_s1, built=True>, <Conv1D name=res_9_conv_d2_s1, built=True>, <Conv1D name=res_10_dilated_conv_d4_s1, built=True>, <Conv1D name=res_10_conv_d4_s1, built=True>, <Conv1D name=res_11_dilated_conv_d8_s1, built=True>, <Conv1D name=res_11_conv_d8_s1, built=True>, <Conv1D name=res_12_dilated_conv_d16_s1, built=True>, <Conv1D name=res_12_conv_d16_s1, built=True>, <Conv1D name=res_13_dilated_conv_d32_s1, built=True>, <Conv1D name=res_13_conv_d32_s1, built=True>, <Conv1D name=res_14_dilated_conv_d64_s1, built=True>, <Conv1D name=res_14_conv_d64_s1, built=True>, <Conv1D name=penultimate_conv_1d, built=True>, <Conv1D name=final_conv_1d, built=True>, <Dense name=dense_wn, built=True>, <Dense name=dense_in, built=True>, <Dense name=dense_h1, built=True>, <Dense name=dense_h2, built=True>, <Dense name=dense_out, built=True>, <SAAF name=saaf_out, built=True>, <Dense name=se_dense1_1, built=True>, <Dense name=se_dense2_1, built=True>]

In [None]:
model_CWAFx.load_weights(modelsPath+'Model_CWAFx' +'_chk.weights.h5', skip_mismatch=True) ;
print ('CWAFx Pretraining finished.');

In [None]:
model_CWAFx.load_weights(modelsPath + modelName+'.weights.h5')

In [None]:
os.chdir('C:/Users/yavuz/Musiccodes/MyCAFModels/Models/')
#model_CWAFx.load_weights('Model_CWAFx.weights.h5');


In [None]:
model_CWAFx.fit(trainGen,
                steps_per_epoch=None,
                epochs= 16,
                verbose=2,
                callbacks = [checkpointer, earlyStopping],
                validation_data = valGen,
                validation_steps=len(Xval),
                shuffle=True)

In [None]:
os.chdir('C:/Users/YavuzBURUKPEAKUP/Kods/MyCAFModels/Models/')
model_CWAFx.save_weights('Test_A_L_A_L12_16K.weights.h5');

In [None]:
def write_wav_file(filename, sampling_rate, audio_data):
    # Ensure 16-bit integer format
    if audio_data.dtype != np.int16:
        audio_data = (np.clip(audio_data, -1.0, 1.0) * 32767).astype(np.int16)
    
    with wave.open(filename, 'wb') as wav_file:
        wav_file.setnchannels(1 if len(audio_data.shape) == 1 else audio_data.shape[1])
        wav_file.setsampwidth(2)  # 2 bytes = 16 bits
        wav_file.setframerate(sampling_rate)
        wav_file.writeframes(audio_data.tobytes())


In [None]:
def read_wav_file(filename):
    with wave.open(filename, 'rb') as wav_file:
        # Get audio parameters
        frames = wav_file.getnframes()
        sample_rate = wav_file.getframerate()
        channels = wav_file.getnchannels()
        sample_width = wav_file.getsampwidth()
        
        # Read audio data
        audio_data = wav_file.readframes(frames)
        
        # Convert to numpy array
        if sample_width == 1:
            dtype = np.uint8
        elif sample_width == 2:
            dtype = np.int16
        elif sample_width == 4:
            dtype = np.int32
        else:
            raise ValueError(f"Unsupported sample width: {sample_width}")
        
        audio_array = np.frombuffer(audio_data, dtype=dtype)
        
        # Reshape for stereo
        if channels > 1:
            audio_array = audio_array.reshape(-1, channels)
        
        return sample_rate, audio_array

In [None]:
models

In [None]:
#nofx_2_array_test = np.load("C:/Users/YavuzBURUKPEAKUP/Kods/MyCAFModels/data/grandhall_stereo_data_left.npy")
#reverb_2_array = np.load("C:/Users/YavuzBURUKPEAKUP/Kods/MyCAFModels/data/grandhall_atmos_data_SR.npy")

#nofx_2_array_test = nofx_2_array_test[0:data_length_include,:]
#reverb_2_array = nofx_2_array[0:data_length_include,:]

os.chdir('C:/Users/YavuzBURUKPEAKUP/Kods/MyCAFModels/Models/')
model_CWAFx.load_weights('CH_Model_CWAFx_A_R_A_R_4_16K.weights.h5');


In [None]:
nofx_2_array.shape

In [None]:
os.chdir('C:/Users/yavuz/Musiccodes/MyCAFModels/Output/')

print('CWAFx Evaluating ')
metrics = {}
mae = []
mfcc_cosine = []
mse_y = []
mse_z = []

main_idx = 5

Xtest_pre = nofx_2_array[main_idx][0:length_in_samples]
Ytest_pre = reverb_2_array[main_idx][0:length_in_samples]

Xtest = Xtest_pre.reshape(1,length_in_samples,1)
Ytest = Ytest_pre.reshape(1,length_in_samples,1)

Xtest = utils.cropAndPad(Xtest, crop = 0, pad = kContext*winLength//2)
Ytest = utils.cropAndPad(Ytest, crop = 0, pad = kContext*winLength//2)

kLen = Xtest.shape[1]
kBatch = int((kLen/(winLength//2)) + 1)

testGen = GeneratorContext(Xtest, Ytest, kContext, winLength, winLength//2)

for idx in range(Xtest.shape[0]):
        x = testGen[idx][0]
        Z = model_CWAFx.predict(x, batch_size=kBatch)
        Z_m = Z[:,:,0]
        Ztest_waveform = overlap(Z_m, kLen, winLength, winLength//2, windowing=True, rate=2)
        Ytest_waveform = Ytest[idx].reshape(Ytest[idx].shape[0])
        audio_data = (Ztest_waveform * 32767).astype(np.int16)
        write_wav_file('Waveform12.wav', 16000, audio_data)

       
        mae.append(utils.getMAEnormalized(Ytest_waveform, Ztest_waveform))
        d = utils.getMSE_MFCC(Ytest_waveform, Ztest_waveform, kSR, mean_norm=False)    
        mfcc_cosine.append(d['cosine'])
        ms, e_y, _ = getMP(Ytest_waveform, kLen)
        ms, e_z, _ = getMP(Ztest_waveform, kLen)
        mse_y.append(e_y)
        mse_z.append(e_z)

d = utils.getDistances(np.asarray(mse_y), np.asarray(mse_z))    
metrics['mae'] = round(np.mean(mae), 5)
metrics['mfcc_cosine'] = round(np.mean(mfcc_cosine), 5)
metrics['msed'] = round(np.mean(d['euclidean']), 5)
    
for metric in metrics.items():
    print(metric)

print('Evaluation finished.')

In [None]:
main_idxs = np.arange(0, 104)
print(len(main_idxs))
main_idxs

In [None]:
os.chdir('C:/Users/yavuz/Musiccodes/MyCAFModels/ModelsLast/')

my_models = ['Model_C_16.weights.h5', 'Model_L_16.weights.h5', 'Model_L_L_16.weights.h5', 'Model_L_RSL_16.weights.h5',
             'Model_L_SL_16.weights.h5', 'Model_L_TFL_16.weights.h5', 'Model_R_16.weights.h5', 'Model_R_R_16.weights.h5',
             'Model_R_RSR_16.weights.h5', 'Model_R_SR_16.weights.h5', 'Model_R_TFR_16.weights.h5', 'Model_RSL_16.weights.h5',
             'Model_RSR_16.weights.h5', 'Model_SL_16.weights.h5', 'Model_SR_16.weights.h5', 'Model_TFL_16.weights.h5',
             'Model_TFR_16.weights.h5']

model_idx = 2
model_to_test = my_models[model_idx]
print("Model: ", model_to_test)
model_CWAFx.load_weights(model_to_test);

In [None]:
os.chdir("C:/Users/yavuz/Musiccodes/MyCAFModels/data/")

nofx_2_array = np.load("grandhall_stereo_data_left.npy")
print("Training Array Shape: ", nofx_2_array.shape)

reverb_2_array = np.load("grandhall_atmos_data_left.npy")
print("Output Array Shape", reverb_2_array.shape)

nofx_2_array_test_16 = np.zeros((104, 64000), dtype=np.float32)
reverb_2_array_16 = np.zeros((104, 64000), dtype=np.float32)

for i in range(104):
    if (i%20==0):
        print(i)
    nofx_2_array_16[i,:] = resampy.resample(nofx_2_array[i,:], sr_orig=48000, sr_new=16000)
    reverb_2_array_16[i,:] = resampy.resample(reverb_2_array[i,:], sr_orig=48000, sr_new=16000)
    

print(nofx_2_array_16.shape, reverb_2_array_16.shape)

nofx_2_array = nofx_2_array_16
reverb_2_array = reverb_2_array_16

print(nofx_2_array.shape, reverb_2_array.shape)

data_length_include = 104

nofx_2_array = nofx_2_array[0:data_length_include,:]
print("Train", nofx_2_array.shape)

reverb_2_array = reverb_2_array[0:data_length_include,:]
print("Output", reverb_2_array.shape)

length_in_samples = nofx_2_array.shape[1]

print("Length in samples", length_in_samples)

nofx_2_array_reshaped = nofx_2_array.reshape(data_length_include, length_in_samples, 1)
reverb_2_array_reshaped = reverb_2_array.reshape(data_length_include, length_in_samples, 1)

In [None]:
os.chdir('C:/Users/yavuz/Musiccodes/MyCAFModels/Output/')

print('CWAFx Evaluating ')
metrics = {}
mae_s = []
mfcc_cosine_s = []
msed_s = []
mse_y = []
mse_z = []

for main_idx in main_idxs:
    print("main_idx", main_idx)
    #main_idx = 5
    mae = []
    mfcc_cosine = []
    mse_y = []
    mse_z = []


    
    Xtest_pre = nofx_2_array[main_idx][0:length_in_samples];
    Ytest_pre = reverb_2_array[main_idx][0:length_in_samples];
    
    Xtest = Xtest_pre.reshape(1,length_in_samples,1);
    Ytest = Ytest_pre.reshape(1,length_in_samples,1);
    
    Xtest = utils.cropAndPad(Xtest, crop = 0, pad = kContext*winLength//2);
    Ytest = utils.cropAndPad(Ytest, crop = 0, pad = kContext*winLength//2);
    
    kLen = Xtest.shape[1];
    kBatch = int((kLen/(winLength//2)) + 1);
    
    testGen = GeneratorContext(Xtest, Ytest, kContext, winLength, winLength//2);
    
    for idx in range(Xtest.shape[0]):
            x = testGen[idx][0];
            Z = model_CWAFx.predict(x, batch_size=kBatch);
            Z_m = Z[:,:,0];
            Ztest_waveform = overlap(Z_m, kLen, winLength, winLength//2, windowing=True, rate=2);
            Ytest_waveform = Ytest[idx].reshape(Ytest[idx].shape[0]);
            #audio_data = (Ztest_waveform * 32767).astype(np.int16);
            #write_wav_file('Waveform' + str(main_idx) + '.wav', 16000, audio_data);
    
           
            mae.append(utils.getMAEnormalized(Ytest_waveform, Ztest_waveform));
            d = utils.getMSE_MFCC(Ytest_waveform, Ztest_waveform, kSR, mean_norm=False);    
            mfcc_cosine.append(d['cosine']);
            ms, e_y, _ = getMP(Ytest_waveform, kLen);
            ms, e_z, _ = getMP(Ztest_waveform, kLen);
            mse_y.append(e_y);
            mse_z.append(e_z);
    
    d = utils.getDistances(np.asarray(mse_y), np.asarray(mse_z));    
    #print('MAE', round(np.mean(mae), 5))
    #print("MCOSINE", round(np.mean(mfcc_cosine), 5))
    #print("MSD", round(np.mean(d['euclidean']), 5))
    mae_s.append(round(np.mean(mae), 5))
    mfcc_cosine_s.append(round(np.mean(mfcc_cosine), 5))
    msed_s.append(round(np.mean(d['euclidean']), 5))
    metrics['mae'] = round(np.mean(mae), 5);
    metrics['mfcc_cosine'] = round(np.mean(mfcc_cosine), 5);
    metrics['msed'] = round(np.mean(d['euclidean']), 5);
        
    #for metric in metrics.items():
    #    print(metric)
    
print('Evaluation finished.')

In [None]:
print(np.mean(mae_s))
print(np.median(mae_s))
print(np.mean(mfcc_cosine_s))
print(np.median(mfcc_cosine_s))
print(np.mean(msed_s))
print(np.median(msed_s))

In [None]:
plt.plot(mae_s);
print(np.mean(mae_s))
print(np.median(mae_s))

In [None]:
plt.plot(mfcc_cosine_s)
print(np.mean(mfcc_cosine_s))
print(np.median(mfcc_cosine_s))

In [None]:
plt.plot(msed_s)
print(np.mean(msed_s))
print(np.median(msed_s))

In [None]:
plt.plot(audio_data)

In [None]:
time = np.linspace(0, 4, length_in_samples)

plt.figure(figsize=(12,12))
plt.subplot(3, 1, 1) 
plt.plot(time, Xtest_pre/max(Xtest_pre));
plt.title("Input Test Signal", fontsize=10)


plt.subplot(3, 1, 2) 
plt.plot(time, Ytest_pre/max(Ytest_pre));
plt.title("Output Test Signal", fontsize=10);

sampling_rate, audio_data = read_wav_file('Waveform12.wav')

plt.subplot(3, 1, 3) 
plt.plot(time, audio_data[0:length_in_samples]/max(audio_data[0:length_in_samples]))
plt.xlabel("Time in secs")
plt.title("Output Test Signal of CWAFx", fontsize=10);

plt.savefig('DatabaseSample12.png', dpi=300, bbox_inches='tight')

write_wav_file("Waveform12_In.wav", 16000, Xtest_pre/max(Xtest_pre))
write_wav_file("Waveform12_Out.wav", 16000, Ytest_pre/max(Ytest_pre))
write_wav_file("Waveform12_Model.wav", 16000, audio_data[0:length_in_samples]/max(audio_data[0:length_in_samples]))


In [None]:
print("Input - Training")
Audio(data=Xtest_pre.T, rate=16000)

In [None]:
print("Output")
Audio(data=Ytest_pre.T, rate=16000)

In [None]:
print("Output of Model")
Audio(data=audio_data, rate=16000)


In [None]:
os.chdir('C:/Users/yavuz/Musiccodes/MyCAFModels/Output/')

print('CWAFx Evaluating ')

main_idx = 12


Xtest_pre = nofx_2_array_test[main_idx][0:length_in_samples] + nofx_2_array_test[main_idx+10][0:length_in_samples] + nofx_2_array_test[main_idx+20][0:length_in_samples] + nofx_2_array_test[main_idx+30][0:length_in_samples]
Ytest_pre = reverb_2_array[main_idx][0:length_in_samples] + reverb_2_array[main_idx+10][0:length_in_samples] + reverb_2_array[main_idx+20][0:length_in_samples] + reverb_2_array[main_idx+30][0:length_in_samples]


Xtest = Xtest_pre.reshape(1,length_in_samples,1)
Ytest = Ytest_pre.reshape(1,length_in_samples,1)
#print("Xtest: ", Xtest.shape, " Ytest: ", Ytest.shape)

# zero pad at the end as well. 
Xtest = utils.cropAndPad(Xtest, crop = 0, pad = kContext*winLength//2)
Ytest = utils.cropAndPad(Ytest, crop = 0, pad = kContext*winLength//2)


kLen = Xtest.shape[1]
kBatch = int((kLen/(winLength//2)) + 1)

testGen = GeneratorContext(Xtest, Ytest, kContext, winLength, winLength//2)
metrics = {}
mae = []
mfcc_cosine = []
mse_y = []
mse_z = []

#model.load_weights("C:\Users\yavuz\Musiccodes\MyCAFModels\Models\Model1.weights.h5", by_name=True) 

for idx in range(Xtest.shape[0]):
        print(idx)

        x = testGen[idx][0]
        Z = model_CWAFx.predict(x, batch_size=kBatch)
        Z_m = Z[:,:,0]
        Ztest_waveform = overlap(Z_m, kLen, winLength, winLength//2, windowing=True, rate=2)
        
        Ytest_waveform = Ytest[idx].reshape(Ytest[idx].shape[0])

        # Convert to 16-bit integers
        audio_data = (Ztest_waveform * 32767).astype(np.int16)
        
        wavfile.write('Waveform12_22_32_42.wav', 16000, audio_data)
        mae.append(utils.getMAEnormalized(Ytest_waveform, Ztest_waveform))
        d = utils.getMSE_MFCC(Ytest_waveform, Ztest_waveform, kSR, mean_norm=False)    
        mfcc_cosine.append(d['cosine'])
        ms, e_y, _ = getMP(Ytest_waveform, kLen)
        ms, e_z, _ = getMP(Ztest_waveform, kLen)
        mse_y.append(e_y)
        mse_z.append(e_z)

#d = utils.getDistances(np.asarray(mse_y), np.asarray(mse_z))    
metrics['mae'] = round(np.mean(mae), 5)
metrics['mfcc_cosine'] = round(np.mean(mfcc_cosine), 5)
metrics['msed'] = round(np.mean(d['euclidean']), 5)
    
for metric in metrics.items():
    print(metric)
        
#with open('./' + 'CWAFx_metrics.json', 'w') as outfile:
#    json.dump(metrics, outfile)

print("MSE_y", mse_y)
 
print('Evaluation finished.')

In [None]:
time = np.linspace(0, 4, 16000*4)

plt.figure(figsize=(12,12))
plt.subplot(3, 1, 1) 
plt.plot(time, Xtest_pre/max(Xtest_pre));
plt.title("Input Test Signal", fontsize=10)


plt.subplot(3, 1, 2) 
plt.plot(time, Ytest_pre/max(Ytest_pre));
plt.title("Output Test Signal", fontsize=10);


sampling_rate, audio_data = read_wav_file('Waveform12_22_32_42.wav')

plt.subplot(3, 1, 3) 
plt.plot(time, audio_data[0:length_in_samples]/max(audio_data[0:length_in_samples]))
plt.xlabel("Time in secs")
plt.title("Output Test Signal of CWAFx", fontsize=10);

plt.savefig('DatabaseSample12_22_32_42.png', dpi=300, bbox_inches='tight')

write_wav_file("Waveform12_22_32_42_In.wav", kSR, Xtest_pre/max(Xtest_pre))
write_wav_file("Waveform12_22_32_42_Out.wav", kSR, Ytest_pre/max(Ytest_pre))
write_wav_file("Waveform12_22_32_42_Model.wav", kSR , audio_data[0:192000]/max(audio_data[0:192000]))


write_wav_file("Waveform12_In.wav", 16000, Xtest_pre/max(Xtest_pre))
write_wav_file("Waveform12_Out.wav", 16000, Ytest_pre/max(Ytest_pre))
write_wav_file("Waveform12_Model.wav", 16000, audio_data[0:length_in_samples]/max(audio_data[0:length_in_samples]))


In [None]:
print("NoFX")
Audio(data=Xtest_pre.T, rate=16000)


In [None]:
print("With Reverb")
Audio(data=Ytest_pre.T, rate=16000)

In [None]:
print("Output of Model")
Audio(data=audio_data, rate=16000)

In [None]:
audio_length = 24

with wave.open('C:/Users/yavuz/Musiccodes/MyCAFModels/data/DT1mono.wav', 'rb') as wav_file:
    # Extract parameters
    n_channels = wav_file.getnchannels()
    sampwidth = wav_file.getsampwidth()
    sampling_rate = wav_file.getframerate()
    n_frames = wav_file.getnframes()

    # Read the frames
    frames = wav_file.readframes(n_frames)

# Convert the frame data to an array of integers
audio_data = np.frombuffer(frames, dtype=np.int16)

# If stereo, split channels
if n_channels == 2:
    audio_data = audio_data.reshape(-1, 2)

audio_data_s = audio_data[0:audio_length*sampling_rate]
audio_data_s = audio_data_s / 32267

print("Sampling Rate", sampling_rate)

time = np.linspace(0, audio_length, audio_length * sampling_rate)
print("Sampling rate: ", sampling_rate)
print("Length : ", n_frames/sampling_rate, "seconds")
plt.figure(figsize=(12,4))
plt.plot(time, audio_data_s);
plt.xlabel("Time in secs")
Audio(data=audio_data_s, rate=sampling_rate)

In [None]:
frame_duration = 2
audio_data_s_resampled = resampy.resample(audio_data_s, sr_orig=44100, sr_new=48000)
number_of_frames = int(len(audio_data_s_resampled)/(frame_duration*48000))
print("Number of Frames", number_of_frames)
audio_data_s_2d = audio_data_s_resampled.reshape(number_of_frames, frame_duration*48000)

In [None]:
audio_data_s_2d.shape

In [None]:
frame_10_down = resampy.resample(audio_data_s_2d[10,:], sr_orig=48000, sr_new=16000)

In [None]:
plt.figure(figsize=(12,4))
plt.plot(frame_10_down);
Audio(data=frame_10_down, rate=16000)

In [None]:
frame_10_down_up = resampy.resample(frame_10_down, sr_orig=16000, sr_new=48000)
plt.figure(figsize=(12,4))
plt.plot(frame_10_down_up);
Audio(data=frame_10_down_up, rate=48000)

In [None]:
zeros = np.zeros(32000)
for part_idx in range(0,number_of_frames):
    print(part_idx)
    Xtest_pre = audio_data_s_2d[part_idx]
    #print("Xtest_pre", Xtest_pre.shape)
    frame_down = resampy.resample(Xtest_pre, sr_orig=48000, sr_new=16000)
    
    Xtest_pre_padded = np.concatenate([frame_down, zeros])
    
    Xtest = Xtest_pre_padded.reshape(1,64000,1)
    Xtest = utils.cropAndPad(Xtest, crop = 0, pad = kContext*winLength//2)    # zero pad at the end as well. 

    kLen = Xtest.shape[1]
    kBatch = int((kLen/(winLength//2)) + 1)

    testGen = GeneratorContext(Xtest, Ytest, kContext, winLength, winLength//2)

    for idx in range(Xtest.shape[0]):


        x = testGen[idx][0]
        Z = model_CWAFx.predict(x, batch_size=kBatch)
        Z_m = Z[:,:,0]
        Ztest_waveform = overlap(Z_m, kLen, winLength, winLength//2, windowing=True, rate=2)
    

    frame_down_up = resampy.resample(Ztest_waveform, sr_orig=16000, sr_new=48000)
    print("Frame Down Up", frame_down_up.shape)
    
    write_wav_file("CWAFx"+'_'+str(part_idx)+'.wav', 48000, frame_down_up)
    
    #sf.write("CWAFx"+'_'+str(part_idx)+'.wav', Ztest_waveform, kSR)
 
 
print('Evaluation finished.')

In [None]:
input_reverb= np.zeros(number_of_frames*96000 + 96000)

for i in range(0,number_of_frames):
    sampling_rate, audio_data = read_wav_file('CWAFx_' + str(i) + '.wav')
    #print("Sampling_rate", sampling_rate)
    #print("Audio_data", audio_data.shape)
    input_reverb[i*96000:i*96000+192000] = audio_data[0:192000]

plt.figure(figsize=(12,4))
plt.plot(time , input_reverb[0:len(time)]);
plt.title("Output Signal of CWAFx")
plt.savefig('DreamTheater.png', dpi=300, bbox_inches='tight');
Audio(data=input_reverb, rate=48000)
sf.write("DreamTheater.wav", input_reverb[0:1152000]/max(input_reverb[0:1152000]), kSR)

Audio(data=input_reverb, rate=sampling_rate)


In [None]:
plt.figure(figsize=(12,4))
plt.plot(time , input_reverb[0:len(time)]);
plt.title("Output Signal of CWAFx")
plt.savefig('DreamTheater.png', dpi=300, bbox_inches='tight');
Audio(data=input_reverb, rate=48000)
sf.write("DreamTheater.wav", input_reverb[0:1152000]/max(input_reverb[0:1152000]), kSR)

In [None]:
tt = np.arange(0, 24, 1 / 22050)
plt.figure(figsize=(12,4))
plt.plot(tt,input_reverb);
plt.title("DreamTheater with reverberation using CWAFx") # "JohnMcLaughlin-AlDiMeola-PacoPena" "PearlJam" "DreamTheater"
sf.write("WaveformModel.wav", audio_data[0:192000]/max(audio_data[0:192000]), kSR)
Audio(data=input_reverb, rate=22050)

In [None]:
sf.write("CWAFx"+'Song' + '.wav', input_reverb, 22050)

In [None]:
pwd

In [None]:
os.chdir("C:/Users/YavuzBURUKPEAKUP/Kods/MyCAFModels/Models")

In [None]:
models = os.listdir()

In [None]:
models[0]

In [None]:
models

In [None]:
for i in range(20):
    print(models[i][12:-12])