# Projet simulation logicielle
* *Rhouch Oussama*
* *Cherki Inssaf*

<img src="figure/model based.png" alt="CS" style="width: 750px;"/>

## Importing libraries

In [1]:
import os
import librosa
import librosa.display
import numpy as np
import soundfile as sf
from scipy.io.wavfile import write
from sklearn import preprocessing
from sklearn.preprocessing import scale
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import zipfile
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchsummary import summary
from torch.utils.data import DataLoader, Dataset
import warnings
import random
import logging
torch.cuda.empty_cache()
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO)

In [2]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device

## Performing preprocessing on the data

##### Unzip the data

In [3]:
zip_file_paths = ['data/data_part1.zip', 'data/data_part2.zip', 'data/data_part3.zip', 'data/data_part4.zip']

for zip_file_path in zip_file_paths:
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(".")

##### Data folders

In [4]:
noise_folder = "data/noise/"
clean_folder = "data/clean/dev-clean"
output_folder = "data/noisy/"
denoising_folder = "data/denoising/"

# Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)
os.makedirs(denoising_folder, exist_ok=True)

### Clean audio data

##### Load clean audio data

In [5]:
# Get the list of clean files
clean_path = []
for folder in os.listdir(clean_folder):
    folder_path = os.path.join(clean_folder, folder)
    if os.path.isdir(folder_path):
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                if file.endswith(".flac"):
                    file_path = os.path.join(root, file)
                    clean_path.append(file_path)

### Noise audio data

##### Load noise audio data

In [6]:
# Get the list of noise files
noise_file = ""
if os.path.isdir(noise_folder):
        for root, dirs, files in os.walk(noise_folder):
            for file in files:
                if file.endswith(".wav"):
                    noise_file = os.path.join(root, file)

### Functions

In [7]:
def alpha(RSB):
    return np.sqrt(10**(RSB/10))

In [8]:
def make_noisy(clean_file, noise_file, output_file, RSB_range=(3, 9)):
    s = clean_file
    u, sr = librosa.load(noise_file, sr=None)
    u = np.tile(u, int(np.ceil(len(s) / len(u))))[:len(s)]  # Handle variable audio lengths

    s_tf = np.fft.fft(s)
    u_tf = np.fft.fft(u)

    RSB_value = 6
    alpha_value = alpha(RSB_value)

    x_tf = s_tf + alpha_value * u_tf

    x = np.fft.ifft(x_tf)
    x = x.astype(np.float32)
    x = x / np.max(np.abs(x))  # Normalization

    sf.write(output_file, x, sr)

    return x


def load_audio(file_path):
    audio, sr = librosa.load(file_path, sr=None)
    audio = librosa.effects.pitch_shift(audio, sr, n_steps=random.uniform(-1, 1))  # Pitch shift augmentation
    return audio

### Preprocessing

In [9]:
class SpeechDataset(Dataset):
    def __init__(self):
        self.clean_files = []
        self.noisy_files = []
        self.noise = []
        self.original_length = []
        self.i = 0
        
        for clean_file in clean_path[:10]:
            s, _ = librosa.load(clean_file, sr=None)
            self.clean_files.append(torch.tensor(s))
            self.original_length.append(len(s))
        
        self.max_len = max(self.original_length)
        u, _ = librosa.load(noise_file, sr=None)
        self.noise = u[:self.max_len]
        
        for i in range(len(self.clean_files)):
            self.clean_files[i] = F.pad(self.clean_files[i], (0, self.max_len - len(self.clean_files[i])))
        
        for clean_file in self.clean_files:
            output_path = f"{output_folder}noisy_{self.i}.wav"
            x = make_noisy(clean_file, noise_file, output_path)
            
            self.noisy_files.append(x)
            
            self.i += 1
            
        for i in range(len(self.clean_files)):
            s = self.clean_files[i]
            x = self.noisy_files[i]
            
            self.clean_files[i] = torch.tensor(np.abs(np.fft.fft(s))).to("cpu")
            self.noisy_files[i] = torch.tensor(np.abs(np.fft.fft(x))).to("cpu")
            
    def __len__(self):
        return len(self.clean_files)
    
    def __getitem__(self, idx):
        try:
            noisy, clean = self.noisy_files[idx], self.clean_files[idx]
        except IndexError:
            logging.error(f"Index {idx} out of range.")
            raise

        return noisy, clean

In [10]:
dataset = SpeechDataset()

In [11]:
input_size = len(dataset.__getitem__(0)[0])
input_size

470400

In [12]:
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

In [13]:
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=True)

In [14]:
class MLP(nn.Module):
    def __init__(self, input_size):
        super(MLP, self).__init__()
        # Define the layers
        self.fc1 = nn.Linear(input_size, 1024) # First hidden layer
        self.fc2 = nn.Linear(1024, 512)        # Second hidden layer
        self.fc3 = nn.Linear(512, 256)         # Third hidden layer
        self.fc4 = nn.Linear(256, input_size)  # Output layer

        # Activation function
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.relu(self.fc3(x))
        x = self.fc4(x)
        return x

In [15]:
def compute_mask(noisy_fft, clean_fft, eps=1e-8):
    # Compute the mask as the ratio of clean to noisy magnitudes
    mask = clean_fft / (noisy_fft + eps)
    
    # Ensure the mask values are between 0 and 1
    mask = np.clip(mask, 0, 1)
    
    return mask


In [16]:
MLP_model = MLP(input_size)

In [17]:
# Define the loss function
criterion = nn.MSELoss()  # Mean Squared Error Loss

# Define the optimizer
optimizer = optim.Adam(MLP_model.parameters(), lr=0.001)

INFO:transformers.file_utils:PyTorch version 2.1.0 available.
INFO:transformers.modeling_xlnet:Better speed can be achieved with apex installed from https://www.github.com/nvidia/apex .


In [18]:
summary(MLP_model, (input_size,))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                 [-1, 1024]     481,690,624
              ReLU-2                 [-1, 1024]               0
            Linear-3                  [-1, 512]         524,800
              ReLU-4                  [-1, 512]               0
            Linear-5                  [-1, 256]         131,328
              ReLU-6                  [-1, 256]               0
            Linear-7               [-1, 470400]     120,892,800
Total params: 603,239,552
Trainable params: 603,239,552
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 1.79
Forward/backward pass size (MB): 3.62
Params size (MB): 2301.18
Estimated Total Size (MB): 2306.59
----------------------------------------------------------------


In [19]:
num_epochs = 10  # Replace with the number of epochs you desire
log_interval = 10
# Training loop
for epoch in range(num_epochs):
    for batch_idx, (noisy, clean) in enumerate(train_loader):
        noisy = noisy.float()
        clean = clean.float()
        
        # Make sure to call zero_grad on the optimizer, not the model
        optimizer.zero_grad() 
        
        # Forward pass through the network
        predicted_mask = MLP_model(noisy)
        
        # Compute the true mask
        mask = compute_mask(noisy, clean)
        
        # Compute loss using the true mask and the predicted mask
        loss = criterion(predicted_mask, mask)
        
        # Backward pass and optimize
        loss.backward()  
        optimizer.step()  
        
        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(noisy)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")



In [25]:
def validate(model, loader, criterion):
    model.eval()  # Set the model to evaluation mode
    validation_loss = 0

    with torch.no_grad():  # No need to track gradients for validation
        for noisy, clean in loader:
            noisy = noisy.float()
            clean = clean.float()

            # Forward pass through the network
            predicted_mask = model(noisy)

            # Compute the true mask
            true_mask = compute_mask(noisy, clean)

            # Compute loss using the true mask and the predicted mask
            loss = criterion(predicted_mask, true_mask)

            # Accumulate the validation loss
            validation_loss += loss.item()

    # Compute the average loss
    validation_loss /= len(loader.dataset)
    return validation_loss

In [26]:
# Perform the validation
val_loss = validate(MLP_model, test_loader, criterion)
print(f'Validation Loss: {val_loss:.6f}')

Validation Loss: 0.216946


In [28]:
def denoise_audio(model, noisy, sr=16000):
    # Compute the predicted mask
    noisy = noisy.float()
    noisy = F.pad(noisy, (0, self.max_len - len(noisy)))
    predicted_mask = model(noisy)

    # Convert the predicted mask from PyTorch Tensor to numpy array and transpose it
    predicted_mask = predicted_mask.detach().numpy().T

    # Apply the mask to the noisy audio data
    denoised_audio = noisy * predicted_mask

    # Compute the magnitude of the denoised audio
    denoised_mag = np.abs(np.fft.fft(denoised_audio))

    # Compute the phase of the noisy audio
    noisy_phase = np.angle(np.fft.fft(noisy))

    # Combine the magnitude and phase
    denoised_complex = denoised_mag * np.exp(1j * noisy_phase)

    # Compute the inverse FFT and convert the result to a real number
    denoised_audio = np.real(np.fft.ifft(denoised_complex))

    # Cast to float32
    denoised_audio = denoised_audio.astype(np.float32)

    # Scale to the range [-1, 1]
    denoised_audio /= np.max(np.abs(denoised_audio))

    return denoised_audio

def denoise_file(model, input_file, output_file, sr=16000):
    # Load the noisy audio file
    noisy, sr = librosa.load(input_file, sr=sr)

    # Pad the noisy audio file so that it is divisible into 2 second chunks
    noisy = F.pad(torch.tensor(noisy), (0, 2 * sr - len(noisy) % (2 * sr)))

    # Split the noisy audio file into 2 second chunks
    noisy_chunks = torch.split(noisy, 2 * sr)

    # Denoise each chunk
    denoised_chunks = []
    for chunk in noisy_chunks:
        denoised_chunks.append(denoise_audio(model, chunk, sr=sr))

    # Combine the denoised chunks into a single audio file
    denoised_audio = torch.cat(denoised_chunks)

    # Save the denoised audio file
    write(output_file, sr, denoised_audio.numpy())
    
    return denoised_audio.numpy()

denoise_file(MLP_model, "data/noisy/noisy_0.wav", "data/denoising/denoised_0.wav")

RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x32000 and 470400x1024)