Project Source Separation

# Import 

In [1]:
import os

import json5
import numpy as np
import torch
from torch.utils.data import DataLoader
from util.utils import initialize_config

# I - Entrainement du Wave U-Net

### Chargement des configurations

In [2]:
# Config

config_path = './config/train/train_sep.json'
resume = False # parameters for a new training or resume training

config_path = os.path.abspath(config_path)
configuration = json5.load(open(config_path))

configuration['experiment_name'] = os.path.splitext(os.path.basename(config_path))
configuration["config_path"] = config_path



mettre ici un exemple de config

In [3]:
config = configuration

train_dataloader = DataLoader(
        dataset=initialize_config(config["train_dataset"]),
        batch_size=config["train_dataloader"]["batch_size"],
        shuffle=config["train_dataloader"]["shuffle"],
        pin_memory=config["train_dataloader"]["pin_memory"]
    )

valid_dataloader = DataLoader(
        dataset=initialize_config(config["validation_dataset"]),
        num_workers=1,
        batch_size=1
    )

model = initialize_config(config["model"])

optimizer = torch.optim.Adam(
        params=model.parameters(),
        lr=config["optimizer"]["lr"],
        betas=(config["optimizer"]["beta1"], config["optimizer"]["beta2"])
    )

loss_function = initialize_config(config["loss_function"])

trainer_class = initialize_config(config["trainer"], pass_args=False)

trainer = trainer_class(
        config=config,
        resume=resume,
        model=model,
        loss_function=loss_function,
        optimizer=optimizer,
        train_dataloader=train_dataloader,
        validation_dataloader=valid_dataloader
    )

trainer.train()

KeyboardInterrupt: 

# Inference

In [20]:

import json
import soundfile as sf

## Config
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
configinfer_path =  './config/infer/infer_sep.json'
config = json.load(open(configinfer_path))


model_checkpoint_path = './checkpoint_infer/best_model_2.tar'
output_dir = './result/test_2/'
assert os.path.exists(output_dir), "Enhanced directory should be exist."




In [21]:
"""
DataLoader
"""
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
dataloader = DataLoader(dataset=initialize_config(config["dataset"]), batch_size=1)

"""
"""

'\n'

In [22]:
import argparse
import json
import os

import librosa
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm

from util.utils import initialize_config, load_checkpoint

"""
Parameters
"""




model = initialize_config(config["model"])
model.load_state_dict(load_checkpoint(model_checkpoint_path, device))
model.to(device)
model.eval()

"""
Enhancement
"""
sample_length = config["custom"]["sample_length"]
for mixture, name in tqdm(dataloader):

    ## 
    enhanced_chunks = []
    noised_chunks = []
    sample_length = config["custom"]["sample_length"]

    padded_length = 0

    mixture = mixture.to(device)  # [1, 1, T]

    # The input of the model should be fixed length.
    if mixture.size(-1) % sample_length != 0:
        padded_length = sample_length - (mixture.size(-1) % sample_length)
        mixture = torch.cat([mixture, torch.zeros(1, 1, padded_length, device=device)], dim=-1)

    assert mixture.size(-1) % sample_length == 0 and mixture.dim() == 3
    mixture_chunks = list(torch.split(mixture, sample_length, dim=-1))

    for chunk in mixture_chunks:
                
                
        noised_chunks.append(model(chunk)[1].detach().cpu())
        enhanced_chunks.append(model(chunk)[0].detach().cpu())

    enhanced = torch.cat(enhanced_chunks, dim=-1)  # [1, 1, T]
    noise_sep = torch.cat(noised_chunks, dim=-1 )  # [1, 1, T]
    enhanced = enhanced if padded_length == 0 else enhanced[:, :, :-padded_length]
    noise_sep = noise_sep if padded_length == 0 else noise_sep[:, :, :-padded_length]
    mixture = mixture if padded_length == 0 else mixture[:, :, :-padded_length]

    enhanced = enhanced.reshape(-1).numpy()
    noise_sep = noise_sep.reshape(-1).numpy()




    mixture = mixture.cpu().numpy().reshape(-1)
    ##

    output_path_enhanced = os.path.join(output_dir, f"{name}+_enhanced.wav")
    output_path_noise = os.path.join(output_dir, f"{name}+_noise.wav")
    sf.write(output_path_enhanced, enhanced, 8000)
    sf.write(output_path_noise, noise_sep, 8000)

  model_checkpoint = torch.load(checkpoint_path, map_location=device)


Loading ./checkpoint_infer/best_model_2.tar, epoch = 500.


100%|██████████| 400/400 [00:43<00:00,  9.29it/s]


In [None]:
noise_path = r"Audio/source_separation/train_small/0011/noise.wav"
voice_path = r"Audio/source_separation/train_small/0011/voice.wav"


import librosa
import numpy as np
import soundfile as sf

import scipy
# Charger les fichiers audio
clean, sr = scipy.io.wavfile.read(os.path.abspath(voice_path))
noise, sr = scipy.io.wavfile.read(os.path.abspath(noise_path))




IndexError: tuple index out of range

In [19]:
import numpy as np

import scipy.io.wavfile

# Charger les fichiers audio
clean, sr = scipy.io.wavfile.read(os.path.abspath(voice_path))
noise, sr = scipy.io.wavfile.read(os.path.abspath(noise_path))

# Calculer les RMS (Root Mean Square) des signaux
rms_clean = np.sqrt(np.mean(clean**2))
rms_noise = np.sqrt(np.mean(noise**2))

# Calculer le facteur d'échelle pour obtenir le SNR désiré
snr = -1
scaling_factor = rms_clean / (10**(snr / 20) * rms_noise)

# Appliquer le facteur d'échelle au bruit
scaled_noise = noise * scaling_factor

# Créer le signal mixture
mixture = clean + scaled_noise

# Sauvegarder le fichier mixture
output_mixture_path = 'mixture.wav'
scipy.io.wavfile.write(output_mixture_path, sr, mixture.astype(np.int16))

output_path_enhanced = os.path.join(output_dir, f"{name[0]}_enhanced.wav")
output_path_noise = os.path.join(output_dir, f"{name[0]}_noise.wav")

IndexError: tuple index out of range