<a href="https://colab.research.google.com/github/jakobatgithub/unreverb/blob/main/ConvolutionAndDeconvolution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install rir_generator

In [None]:
import numpy as np
import scipy.signal as ss
import soundfile as sf
import rir_generator as rir
from IPython.display import Audio 
from IPython.core.display import display
import matplotlib.pyplot as plt

# A script to generate the Impulse Response (IR) Functions of a rectangular room
# of a given size with given receiver and source positions and given Reverberation time T60.

# https://github.com/audiolabs/rir-generator
# pip install rir-generator
# pip install SoundFile

#signal, fs = sf.read("bark.wav", always_2d=True)

def my_convolution(IR_data, signal):
  # be careful with the order of IR_data and signal!
  #return ss.convolve(IR_data, signal, mode='full')
  #return ss.convolve(signal, IR_data, mode='same')
	return ss.convolve(signal, IR_data, mode='valid')
 
def render_history(history):
    plt.plot(history["loss"], label="loss")
    plt.plot(history["val_loss"], label="val_loss")
    plt.legend()
    plt.title("Our losses")
    plt.show()
    plt.close()

#samplerate = 2**13
samplerate = 8000

#reverberation_times = [0.4, 0.6, 0.8]
reverberation_times = [0.5]
IR_datas = []
idx = 0
for reverberation_time in reverberation_times:
    IR_data = rir.generate(
    	c=340,                  # Sound velocity (m/s)
    	fs=samplerate,                  # Sample frequency (samples/s)
    	r=[                     # Receiver position(s) [x y z] (m)
    	    [2.5, 2, 1.5]
    	],
    	s=[3.0, 2, 1.5],          # Source position [x y z] (m)
    	L=[7, 5, 3],            # Room dimensions [x y z] (m)
    	reverberation_time=reverberation_time, # Reverberation time T60 (s)
    	nsample=2*samplerate,           # Number of output samples
	)
    print(IR_data.shape)              # (4096, 3)
    IR_datas.append(IR_data)
    #sf.write('random_IRs/IRF_'+str(idx)+'.wav', IR_data, samplerate)
    idx = idx +1	

#print(signal.shape)         # (11462, 2)

# Convolve 2-channel signal with 3 impulse responses
#signal = ss.convolve(h[:, None, :], signal[:, :, None])

#print(signal.shape)         # (15557, 2, 3)

In [None]:
N = 8*samplerate
batch = 10
split = batch
IR_data = IR_datas[0].T[0]
signal = np.random.randn(batch*N)
signals = np.array(np.array_split(signal, split))
transformed_signals = []
for signal in signals:
  transformed_signal = my_convolution(IR_data, signal)
  transformed_signals.append(transformed_signal)

transformed_signals = np.array(transformed_signals)
print(signal.shape)
print(signals.shape)
print(transformed_signal.shape)
print(transformed_signals.shape)
display(Audio(np.array(signals[0]), rate=samplerate, autoplay=False))
display(Audio(np.array(transformed_signals[0]), rate=samplerate, autoplay=False))

In [None]:
plt.plot(signals[0])
plt.show()

In [None]:
plt.plot(transformed_signals[0])
plt.show()

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np

from google.colab import drive
drive.mount('/content/drive')

In [None]:
# load data
# path of the tensorflow dataset 
path = '/content/drive/MyDrive/dsr_project/data/but-czas_v1.0/wavs/'

# load the tensorflow dataset
dataset_F = tf.data.experimental.load(path + "tf_F_2_randomIRFs_dataset")
dataset_M = tf.data.experimental.load(path + "tf_M_2_randomIRFs_dataset")
dataset = dataset_F.concatenate(dataset_M)

# determine size of the dataset
dataset_size = sum(1 for _ in dataset)
print(f"dataset_size: {dataset_size}")

def lambda_1(features, labels, targets):
  return (features, labels)

def lambda_2(features, labels, targets):
  return (features, targets)

def lambda_3(features, labels, targets):
  return labels

# obtain the datasets containing only labels or targets
label_dataset = dataset.map(lambda features, labels, targets: lambda_1(features, labels, targets))
target_dataset = dataset.map(lambda features, labels, targets: lambda_2(features, labels, targets))

# obtain all labels
all_labels = dataset.map(lambda features, labels, targets: lambda_3(features, labels, targets))

# transform all_labels dataset to numpy array
all_labels_np = []
for label in all_labels:
  all_labels_np.append(tfds.as_numpy(label))

all_labels_np = np.array(all_labels_np)

# determine unique labels
unique_labels, counts = np.unique(all_labels_np, return_counts=True)
print(f"unique_labels: {unique_labels}")
#print(counts)

# determine number of unique labels
nr_of_unique_labels = unique_labels.shape[0]
print(f"nr_of_unique_labels: {nr_of_unique_labels}")

# shuffle the dataset before splitting in train and validate!
label_dataset = label_dataset.shuffle(dataset_size)
dataset = dataset.shuffle(dataset_size)

# split dataset in train and validate
train_fraction = 0.8
validate_dataset_size = int(dataset_size * (1.0-train_fraction)) # 20 percent of dataset_size
#train_dataset = label_dataset.skip(validate_dataset_size)
#validate_dataset = label_dataset.take(validate_dataset_size)

# load response functions
ir_path = '/content/drive/My Drive/dsr_project/data/random_IRs/'
responses_dataset = tf.data.experimental.load(ir_path + "randomIRFs")

In [None]:
X0 = []
y0 = []
for feature, label, target in dataset:
  label = label.numpy()
  if label == 3:
    a = feature.numpy()[0]
    a_max = np.max([abs(np.max(a)), abs(np.min(a))])
    a = a / a_max
    b = target.numpy()[0]
    b_max = np.max([abs(np.max(b)), abs(np.min(b))])
    b = b / b_max
    X0.append(a)
    y0.append(b)

split_idx = int(0.8*len(X0))

X = np.array(X0)
y = np.array(y0)
X_train = np.array(X0[:split_idx])
y_train = np.array(y0[:split_idx])
X_validate = np.array(X0[split_idx + 1:])
y_validate = np.array(y0[split_idx + 1:])

In [None]:
#X_train = np.array(y0[:split_idx])
#y_train = np.array(X0[:split_idx])
#X_validate = np.array(y0[split_idx + 1:])
#y_validate = np.array(X0[split_idx + 1:])

In [None]:
for feature, label, target in dataset.shuffle(dataset_size).take(1):
  print(feature.numpy().shape)
  display(Audio(feature.numpy()[0], rate=samplerate, autoplay=False))
  print(target.numpy().shape)
  display(Audio(target.numpy()[0], rate=samplerate, autoplay=False))

In [None]:
from keras.models import Sequential
from keras.layers import Dense, Convolution1D

model = Sequential([Dense(y.shape[1], input_dim=X.shape[1], use_bias=False, activation=None)])
#model = Sequential()
#model.add(Dense(10000, input_dim=X.shape[1], use_bias=False, activation=None))
#model.add(Dense(y.shape[1], use_bias=False, activation=None))
model.summary()

In [None]:
#model.compile(loss='mean_squared_error', optimizer='adam')
model.compile(loss='cosine_similarity', optimizer='adam')
history = model.fit(X_train, y_train, epochs=200, batch_size=100, verbose='auto', validation_data=(X_validate, y_validate), shuffle=True)
print("\n")
print(f"history.history['loss'][-1]: {history.history['loss'][-1]}")
print(f"history.history['val_loss'][-1]: {history.history['val_loss'][-1]}")
render_history(history.history)

In [None]:
idx = 52
deconvolved = model.predict(X_validate)
ass = []
for a in deconvolved:
    a_max = np.max([abs(np.max(a)), abs(np.min(a))])
    a = a / a_max
    ass.append(a)

deconvolved = np.array(ass)
display(Audio(y_validate[idx], rate=samplerate, autoplay=False))
display(Audio(X_validate[idx], rate=samplerate, autoplay=False))
display(Audio(deconvolved[idx], rate=samplerate, autoplay=False))
plt.plot(deconvolved[idx], 'b')
plt.plot(y_validate[idx], 'r')
plt.show()

In [None]:
!pip install noisereduce

In [None]:
import noisereduce as nr
deconvolved_filtered = nr.reduce_noise(y=deconvolved[idx], sr=samplerate)
display(Audio(deconvolved[idx], rate=samplerate, autoplay=False))
display(Audio(deconvolved_filtered, rate=samplerate, autoplay=False))
plt.plot(deconvolved[idx], 'b')
plt.plot(deconvolved_filtered, 'r')
plt.show()

In [None]:
plt.plot(X_validate[idx], 'b')
plt.plot(y_validate[idx], 'r')
plt.show()

In [None]:
plt.plot(y_validate[idx]-deconvolved[idx], 'r')
plt.show()

In [None]:
idx = 150
deconvolved = model.predict(X_train)
ass = []
for a in deconvolved:
    a_max = np.max([abs(np.max(a)), abs(np.min(a))])
    a = a / a_max
    ass.append(a)

deconvolved = np.array(ass)
display(Audio(y_train[idx], rate=samplerate, autoplay=False))
display(Audio(X_train[idx], rate=samplerate, autoplay=False))
display(Audio(deconvolved[idx], rate=samplerate, autoplay=False))
plt.plot(deconvolved[idx], 'b')
plt.plot(y_train[idx], 'r')
plt.show()

In [None]:
from numpy.fft import fft, ifft, ifftshift

lambd_est = 1e-3
fft(signal)
def wiener_deconvolution(signal, kernel, lambd):
  "lambd is the SNR"
  kernel = np.hstack((kernel, np.zeros(len(signal) - len(kernel)))) # zero pad the kernel to same length
  H = fft(kernel)
  Y = fft(signal)
  S = np.abs(fft(signal))**2
  #deconvolved = np.real(ifft(Y*np.conj(H)*S/(H*np.conj(H)*S + lambd**2)))
  GY = Y*np.conj(H)/(H*np.conj(H) + lambd**2)
  deconvolved = np.real(ifft(GY))
  return deconvolved

In [None]:
#start_idx = np.max(np.where(IR_data == 0.0)[0]) + 1
#print(start_idx)
ir_data = IR_data
ir_max = np.max([abs(np.max(ir_data)), abs(np.min(ir_data))])
#ir_data = ir_data / ir_max
for features, targets in target_dataset.take(1):
  signal = targets.numpy()[0]
  signal_max = np.max([abs(np.max(signal)), abs(np.min(signal))])
  signal = signal / signal_max
  print(f"signal.shape: {signal.shape}, max: {np.max(signal)}, min: {np.min(signal)}")
  print(f"ir_data.shape: {ir_data.shape}, max: {np.max(ir_data)}, min: {np.min(ir_data)}")
  convolved_signal = ss.convolve(IR_data, signal, mode='valid')
  convolved_signal_max = np.max([abs(np.max(convolved_signal)), abs(np.min(convolved_signal))])
  convolved_signal = convolved_signal / convolved_signal_max
  print(f"convolved_signal.shape: {convolved_signal.shape}, max: {np.max(convolved_signal)}, min: {np.min(convolved_signal)}")


#deconvolved_signal, residue = ss.deconvolve(signal, ir_data[start_idx:])
#deconvolved_signal = wiener_deconvolution(signal, ir_data[start_idx:],  lambd=1e-0)  
deconvolved_signal = wiener_deconvolution(signal, ir_data,  lambd = 1.0*1e-0)
deconvolved_signal_max = np.max([abs(np.max(deconvolved_signal)), abs(np.min(deconvolved_signal))])
deconvolved_signal = deconvolved_signal / deconvolved_signal_max
print(f"deconvolved_signal.shape: {deconvolved_signal.shape}, max: {np.max(deconvolved_signal)}, min: {np.min(deconvolved_signal)}")
display(Audio(signal, rate=8000))
display(Audio(convolved_signal, rate=8000))  
display(Audio(deconvolved_signal, rate=8000))  

In [None]:
plt.plot(signal[19000:20000], 'b')
#plt.plot(convolved_signal[19000:20000], 'g')
plt.plot(deconvolved_signal[19000:20000], 'r')
plt.show()

In [None]:
signal_psd = np.abs(np.fft.fft(signal))**2
plt.plot(signal_psd)
plt.show()

In [None]:
signal_psd.shape

In [None]:
_, signal_psd = ss.welch(signal, samplerate)
plt.plot(signal_psd)
plt.show()

In [None]:
signal_psd.shape

In [None]:
for i in range(-10, 10, 1):
  lambd = 10.0**(i/10.0)
  deconvolved_signal = wiener_deconvolution(signal, ir_data,  lambd)  
  print(f"lambda: {lambd}, RMS: {np.sqrt(np.mean((signal - deconvolved_signal) ** 2))}")

In [None]:
import numpy as np
import librosa
import matplotlib.pyplot as plt
import rir_generator as rir
from IPython.display import Audio 
from IPython.core.display import display
from numpy.fft import fft, ifft


audiopath = '/content/drive/MyDrive/dsr_project/data/but-czas_v1.0/wavs/F-22-02/F-22-02-003.wav'
irpath = '/content/drive/MyDrive/dsr_project/data/arthur-sykes-rymer-auditorium-university-york/b-format/s1r2.wav'

sample_rate = 24000

audio, audio_sample_rate = librosa.load(audiopath, sr=sample_rate) # Downsample 24kHz
IR, IR_sample_rate = librosa.load(irpath, sr=sample_rate) # Downsample 24kHz

print(f"audio_sample_rate: {audio_sample_rate}, audio.shape: {audio.shape}, max: {np.max(audio)}, min: {np.min(audio)}")
print(f"IR_sample_rate: {IR_sample_rate}, IR.shape: {IR.shape}, max: {np.max(IR)}, min: {np.min(IR)}")
display(Audio(audio, rate=sample_rate))
display(Audio(IR, rate=sample_rate))
plt.plot(audio)
plt.show()
plt.plot(IR)
plt.show()

In [None]:
lambd_est = 1e-3

def wiener_deconvolution(signal, kernel, lambd=lambd_est):
  kernel = np.hstack((kernel, np.zeros(len(signal) - len(kernel)))) # zero pad the kernel to same length
  H = fft(kernel)
  Y = fft(signal)
  S = np.abs(fft(signal))**2
  GY = Y*np.conj(H)*S/(H*np.conj(H)*S + lambd**2)
  #GY = Y*np.conj(H)/(H*np.conj(H) + lambd**2)
  deconvolved = np.real(ifft(GY))
  return deconvolved

In [None]:
IR_padded = np.hstack((IR, np.zeros(len(audio) - len(IR))))
audio_reverb = np.real(ifft(fft(IR_padded)*fft(audio)))
deconvolved = wiener_deconvolution(audio_reverb, IR, lambd=0.0)
print(f"max: {np.max(deconvolved)}, min: {np.min(deconvolved)}")
display(Audio(audio[:2*sample_rate], rate=sample_rate))
display(Audio(audio_reverb[:2*sample_rate], rate=sample_rate))
display(Audio(deconvolved[:2*sample_rate], rate=sample_rate))
delta = audio - deconvolved
print(f"max: {np.max(delta)}, min: {np.min(delta)}, msd: {np.sqrt(np.square(delta).mean()})")
plt.plot(delta[:2*sample_rate])
plt.show()

In [None]:
mes = []
for expo in range(-10, 10, 1):
  deconvolved = wiener_deconvolution(audio_reverb, IR, lambd=10**(expo))
  mes.append([10**(expo), np.sqrt(np.square(np.subtract(deconvolved, audio)).mean())])

mes

In [None]:
IR = rir.generate(
    	c=340,                  # Sound velocity (m/s)
    	fs=sample_rate,                  # Sample frequency (samples/s)
    	r=[                     # Receiver position(s) [x y z] (m)
    	    [2.5, 2, 1.5]
    	],
    	s=[3.0, 2, 1.5],          # Source position [x y z] (m)
    	L=[7, 5, 3],            # Room dimensions [x y z] (m)
    	reverberation_time=1.2, # Reverberation time T60 (s)
    	nsample=2*sample_rate,           # Number of output samples
	)
IR = IR[:,0]

IR_padded = np.hstack((IR, np.zeros(len(audio) - len(IR))))
audio_reverb = np.real(ifft(fft(IR_padded)*fft(audio)))
deconvolved = wiener_deconvolution(audio_reverb, IR, lambd=0.0)

In [None]:
print(f"max: {np.max(deconvolved)}, min: {np.min(deconvolved)}")
display(Audio(audio[:2*sample_rate], rate=sample_rate))
display(Audio(audio_reverb[:2*sample_rate], rate=sample_rate))
display(Audio(deconvolved[:2*sample_rate], rate=sample_rate))
delta = audio - deconvolved
print(f"max: {np.max(delta)}, min: {np.min(delta)}, mse: {np.square(delta).mean()}")
plt.plot(delta[:2*sample_rate])
plt.show()

In [None]:
mes = []
for expo in range(-10, 10, 1):
  deconvolved = wiener_deconvolution(audio_reverb, IR, lambd=10**(expo))
  mes.append([10**(expo), np.sqrt(np.square(np.subtract(deconvolved, audio)).mean())])

mes