In [1]:
import numpy as np
import librosa
import soundfile as sf

def load_audio(file_path, sr=16000):
    audio, _ = librosa.load(file_path, sr=sr)
    return audio

In [2]:
def adjust_noise_length(noise, target_length):
    if len(noise) < target_length:
        repeats = int(np.ceil(target_length / len(noise)))
        noise = np.tile(noise, repeats)
    return noise[:target_length]

In [25]:
def mix_audio(clean, noise, snr_db):
    
    clean_power = np.sum(clean ** 2) / len(clean)
    noise_power = np.sum(noise ** 2) / len(noise)
    print(clean_power, noise_power)
    
    #formula for snr [snr = 10log10(ps/pn)]
    desired_noise_power = clean_power / (10 ** (snr_db / 10))
    noise = noise * np.sqrt(desired_noise_power / noise_power)
    
    return clean + noise

In [26]:
clean_audio_path = 'clean.flac'
noise_audio_path = 'noise.wav'
clean_audio = load_audio(clean_audio_path)
noise_audio = load_audio(noise_audio_path)

In [46]:
clean_audio.shape, len(clean_audio)

((43840,), 43840)

In [27]:
adjusted_noise = adjust_noise_length(noise_audio, len(clean_audio))

In [36]:
snr_db = 20
mixed_audio = mix_audio(clean_audio, adjusted_noise, snr_db)

0.0011960044394444375 3.19802467405361e-05


In [37]:
mixed_audio_path = 'mix.wav'
sf.write(mixed_audio_path, mixed_audio, samplerate=16000)

In [47]:
def adjust_noise_length(noise, target_length):
    if len(noise) < target_length:
        repeats = int(np.ceil(target_length / len(noise)))
        noise = np.tile(noise, repeats)
    return noise[:target_length]

In [None]:
import os

sr = 16000 
min_length = 4
segment_length = 4

clean_audio_dir = "clean_audios"
mixed_audio_path = "mix"
new_clean_audio_path = "clean"
datset_path = "dataset"

counter = 0
noise_audio = load_audio(noise_audio_path)
for root, dirs, files in os.walk(clean_audio_dir):
    for file in files:
        if file.endswith('.flac'):
            clean_audio_path = os.path.join(root, file)
            
            clean_audio = load_audio(clean_audio_path)
            
            if len(clean_audio) / sr < min_length:
                continue
            
            num_segments = len(clean_audio) // (segment_length * sr)
            # segments = [clean_audio[i * segment_length * sr : (i + 1) * segment_length * sr] for i in range(num_segments)]
            
            mixed_segments = []
            # for segment in segments:
            for i in range(num_segments):
                segment = clean_audio[i * segment_length * sr : (i + 1) * segment_length * sr]
                adjusted_noise = adjust_noise_length(noise_audio, len(segment))
                mixed_audio = mix_audio(segment, adjusted_noise, snr_db)
                
                #save mix and clean
                mix_segment_file_name = f"mix_{counter}.wav"
                clean_segment_file_name = f"clean_{counter}.wav"
                
                os.makedirs(os.path.join(datset_path, mixed_audio_path), exist_ok=True)
                os.makedirs(os.path.join(datset_path, new_clean_audio_path), exist_ok=True)
                
                mixed_output_audio_path = os.path.join(datset_path, mixed_audio_path, mix_segment_file_name)
                new_clean_output_audio_path = os.path.join(datset_path, new_clean_audio_path, clean_segment_file_name)
                
                
                sf.write(mixed_output_audio_path, mixed_audio, samplerate=sr)
                sf.write(new_clean_output_audio_path, segment, samplerate=sr)
                
                mixed_segments.append(mixed_audio)
                counter+=1
                

In [None]:
import os
clean_audio_dir = "clean_audios"
for s, i, j in os.walk(clean_audio_dir):
    print(s, i, j)
    print("___")
    

In [79]:
import os
import random
import shutil

def create_mix_clean_dir(dirs):
    os.makedirs(os.path.join(dirs, "mix"), exist_ok=True)
    os.makedirs(os.path.join(dirs, "clean"), exist_ok=True)
    

def distribute_dataset(dataset_path, dev_dataset_path, train_ratio=0.7, test_ratio=0.15, eval_ratio=0.15, seed=None):
    # Ensure consistent shuffling with a seed
    # random.seed(seed)
    
    mix_dir = os.path.join(dataset_path, 'mix')
    clean_dir = os.path.join(dataset_path, 'clean')
    
    mix_files = sorted(os.listdir(mix_dir))
    clean_files = sorted(os.listdir(clean_dir))
    # print(sorted(mix_files))
    # print(sorted(clean_files))
    
    # Shuffle files
    # random.shuffle(mix_files)
    # random.shuffle(clean_files)
    
    total_files = len(mix_files)
    # print(total_files)
    train_split = int(train_ratio * total_files)
    test_split = int(test_ratio * total_files)
    
    # Assign files to train, test, eval sets
    train_mix = mix_files[:train_split]
    train_clean = clean_files[:train_split]
    test_mix = mix_files[train_split:train_split + test_split]
    test_clean = clean_files[train_split:train_split + test_split]
    eval_mix = mix_files[train_split + test_split:]
    eval_clean = clean_files[train_split + test_split:]
    
    # Create train, test, eval directories
    train_dir = os.path.join(dev_dataset_path, 'train')
    test_dir = os.path.join(dev_dataset_path, 'test')
    eval_dir = os.path.join(dev_dataset_path, 'eval')
    
    create_mix_clean_dir(train_dir)
    create_mix_clean_dir(test_dir)
    create_mix_clean_dir(eval_dir)
    
    # Move files to train directory
    move_files(train_mix, mix_dir, os.path.join(train_dir, "mix"))
    move_files(train_clean, clean_dir, os.path.join(train_dir, "clean"))
    
    # Move files to test directory
    move_files(test_mix, mix_dir, os.path.join(test_dir, "mix"))
    move_files(test_clean, clean_dir, os.path.join(test_dir, "clean"))
    
    # Move files to eval directory
    move_files(eval_mix, mix_dir, os.path.join(eval_dir, "mix"))
    move_files(eval_clean, clean_dir, os.path.join(eval_dir, "clean"))

def move_files(file_list, source_dir, dest_dir):
    for file_name in file_list:
        # print(file_name)
        src = os.path.join(source_dir, file_name)
        dst = os.path.join(dest_dir, file_name)
        shutil.copy(src, dst)

# Example usage
dataset_path = 'dataset'
dev_dataset_path = "dev_dataset"
distribute_dataset(dataset_path, dev_dataset_path, train_ratio=0.7, test_ratio=0.15, eval_ratio=0.15, seed=42)


In [70]:
os.listdir("dataset/mix")

[]

In [71]:
os.path.exists('dataset/mix')

True

In [4]:
from model import UNet

In [14]:
import torch
x = torch.randn((1, 1, 128, 128)) 

In [15]:
model = UNet()

In [16]:
out = model(x)

In [17]:
out.shape

torch.Size([1, 1, 128, 128])

In [18]:
x

tensor([[[[-0.0767,  0.3060,  1.2362,  ..., -0.3088, -0.8095,  0.9545],
          [-0.0800,  0.3466, -1.1345,  ..., -0.4362,  0.1678,  1.2475],
          [ 0.2678, -0.9913, -1.5479,  ...,  0.8430, -1.6516, -0.1731],
          ...,
          [-0.4186,  0.1844,  2.0493,  ..., -0.6730,  0.2565, -1.1709],
          [-0.3757, -2.6319, -2.2088,  ...,  0.7425,  0.5840, -1.2595],
          [-0.9436,  0.4847,  1.4697,  ..., -1.2067,  0.8979,  1.1210]]]])

In [19]:
out

tensor([[[[0.1270, 0.1688, 0.7202,  ..., 0.6224, 0.2794, 0.8534],
          [0.6064, 0.5189, 0.6640,  ..., 0.2825, 0.7961, 0.4154],
          [0.8635, 0.1216, 0.8133,  ..., 0.0628, 0.9516, 0.7008],
          ...,
          [0.7414, 0.9784, 0.9175,  ..., 0.5799, 0.3556, 0.6673],
          [0.9318, 0.9971, 0.9438,  ..., 0.8805, 0.5837, 0.6974],
          [0.6387, 0.7180, 0.8289,  ..., 0.9301, 0.5585, 0.5620]]]],
       grad_fn=<TanhBackward0>)

In [6]:
import os
sorted(os.listdir(os.path.join("dev_dataset/train/", "mix")))


['mix_0.wav',
 'mix_1.wav',
 'mix_10.wav',
 'mix_100.wav',
 'mix_1000.wav',
 'mix_1001.wav',
 'mix_1002.wav',
 'mix_1003.wav',
 'mix_1004.wav',
 'mix_1005.wav',
 'mix_1006.wav',
 'mix_1007.wav',
 'mix_1008.wav',
 'mix_1009.wav',
 'mix_101.wav',
 'mix_1010.wav',
 'mix_1011.wav',
 'mix_1012.wav',
 'mix_1013.wav',
 'mix_1014.wav',
 'mix_1015.wav',
 'mix_1016.wav',
 'mix_1017.wav',
 'mix_1018.wav',
 'mix_1019.wav',
 'mix_102.wav',
 'mix_1020.wav',
 'mix_1021.wav',
 'mix_1022.wav',
 'mix_1023.wav',
 'mix_1024.wav',
 'mix_1025.wav',
 'mix_1026.wav',
 'mix_1027.wav',
 'mix_1028.wav',
 'mix_1029.wav',
 'mix_103.wav',
 'mix_1030.wav',
 'mix_1031.wav',
 'mix_1032.wav',
 'mix_1033.wav',
 'mix_1034.wav',
 'mix_1035.wav',
 'mix_1036.wav',
 'mix_1037.wav',
 'mix_1038.wav',
 'mix_1039.wav',
 'mix_104.wav',
 'mix_1040.wav',
 'mix_1041.wav',
 'mix_1042.wav',
 'mix_1043.wav',
 'mix_1044.wav',
 'mix_1045.wav',
 'mix_1046.wav',
 'mix_1047.wav',
 'mix_1048.wav',
 'mix_1049.wav',
 'mix_105.wav',
 'mix_1050.

In [15]:
mix = sorted(os.listdir(os.path.join("dev_dataset/train/", "mix")))
clean = sorted(os.listdir(os.path.join("dev_dataset/train/", "clean")))

In [23]:
counter = 0
for i, j in zip(mix, clean):
    # print(i.split("_")[1], j.split("_")[1])
    if i.split("_")[1]!=j.split("_")[1]:
        counter += 1
        

In [25]:
import librosa

In [30]:
ns, _ = librosa.load("noise.wav", sr=16000)
ns, ns.shape

(array([ 0.        ,  0.        ,  0.        , ..., -0.0045166 ,
        -0.00305176, -0.00027466], dtype=float32),
 (4800064,))

In [1]:
import torch
nt  = torch.from_numpy(ns)

NameError: name 'ns' is not defined

In [7]:
ns, nt.shape

NameError: name 'ns' is not defined

In [8]:
torch.tensor(ns).shape

NameError: name 'ns' is not defined

In [9]:
from dataset import AudioDataset
sr = 16000
datasets = {
    "train": AudioDataset("dev_dataset/train", sr),
    "eval":  AudioDataset("dev_dataset/eval", sr)
}
batch_size = {
    "train": 16,
    "eval": 8
}

In [11]:
dataloaders = {k: torch.utils.data.DataLoader(datasets[k], batch_size[k]) for k, v in datasets.items()}
dataloaders

{'train': <torch.utils.data.dataloader.DataLoader at 0x77b0c271a3b0>,
 'eval': <torch.utils.data.dataloader.DataLoader at 0x77b0c2719bd0>}

In [12]:
for idx, (mix, target) in enumerate(dataloaders["train"]):
    print(mix, mix.shape)
    print(target.shape)

tensor([[ 5.4932e-04, -2.1362e-04, -3.9673e-04,  ...,  1.7883e-02,
          1.8860e-02,  5.4932e-03],
        [-9.1553e-04, -9.1553e-04, -9.4604e-04,  ...,  2.8809e-02,
          3.1403e-02,  1.0284e-02],
        [ 1.0071e-03,  8.5449e-04,  6.4087e-04,  ...,  2.8534e-02,
          3.2593e-02,  1.4374e-02],
        ...,
        [ 3.0518e-05,  3.0518e-05,  3.0518e-05,  ...,  2.9755e-02,
          3.1708e-02,  8.6975e-03],
        [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  9.1553e-03,
          8.7280e-03, -2.0935e-02],
        [ 9.1553e-05,  3.0518e-05,  6.1035e-05,  ..., -1.5411e-02,
         -2.8595e-02, -6.3049e-02]]) torch.Size([16, 64000])
torch.Size([16, 64000])
tensor([[-7.1533e-02, -6.4270e-02, -7.0282e-02,  ..., -1.5625e-02,
         -3.3966e-02, -7.0770e-02],
        [-6.1035e-05,  9.1553e-05,  2.4414e-04,  ...,  3.1525e-02,
          4.1962e-02,  1.6479e-02],
        [ 1.2207e-04, -6.1035e-05, -6.1035e-05,  ...,  4.4189e-02,
          4.9988e-02,  2.0782e-02],
        ...

In [2]:
import torch.nn as nn
class STFTModule(nn.Module):
    def __init__(self, n_fft=512, hop_length=256, win_length=512):
        super().__init__()
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.win_length = win_length

    def forward(self, x):
        return torch.stft(x, n_fft=self.n_fft, hop_length=self.hop_length, 
                          win_length=self.win_length, return_complex=True)

In [3]:
stft = STFTModule()

In [11]:
import torch, librosa
x, _ = librosa.load("noise.wav", sr=16000)
x = torch.tensor(x)

In [16]:
out = stft(x)
out

tensor([[-4.3976e-02+0.0000e+00j, -6.4697e-02+0.0000e+00j,
          8.2275e-02+0.0000e+00j,  ...,
          4.8779e-01+0.0000e+00j,  2.4918e-01+0.0000e+00j,
         -2.1210e-01+0.0000e+00j],
        [ 1.4768e-01-1.4901e-08j,  1.0476e-01+7.5145e-02j,
          3.4236e-02-6.0079e-03j,  ...,
         -4.3754e-01+8.8217e-02j,  2.9384e-01-1.7450e-01j,
          1.3007e-01-1.7565e-01j],
        [ 2.5019e-02+0.0000e+00j, -1.9354e-01+6.9217e-02j,
         -1.1773e-01+7.7926e-02j,  ...,
          2.1350e-03+7.5048e-03j, -1.7088e-01-1.3453e-01j,
         -5.6273e-02+2.2145e-01j],
        ...,
        [-4.8566e-04-1.8626e-09j, -6.1974e-03+3.4386e-02j,
         -3.2119e-02+3.1435e-02j,  ...,
         -1.3333e-02-3.9952e-03j,  9.7241e-04+2.0733e-03j,
          6.3498e-03+3.1082e-02j],
        [ 1.7100e-02+0.0000e+00j, -4.7075e-02+2.3444e-02j,
          3.5338e-02-2.7346e-02j,  ...,
          1.0179e-02+2.3379e-03j, -1.7444e-02+4.2087e-03j,
         -6.1399e-03-1.1095e-02j],
        [ 2.3041e-02+0

In [20]:
out.abs()
# mix_phase = mix_stft.angle()

tensor([[4.3976e-02, 6.4697e-02, 8.2275e-02,  ..., 4.8779e-01, 2.4918e-01,
         2.1210e-01],
        [1.4768e-01, 1.2893e-01, 3.4759e-02,  ..., 4.4634e-01, 3.4175e-01,
         2.1856e-01],
        [2.5019e-02, 2.0555e-01, 1.4118e-01,  ..., 7.8026e-03, 2.1748e-01,
         2.2849e-01],
        ...,
        [4.8566e-04, 3.4940e-02, 4.4942e-02,  ..., 1.3919e-02, 2.2900e-03,
         3.1724e-02],
        [1.7100e-02, 5.2590e-02, 4.4683e-02,  ..., 1.0444e-02, 1.7945e-02,
         1.2681e-02],
        [2.3041e-02, 2.1118e-02, 3.3386e-02,  ..., 1.6479e-03, 3.8147e-03,
         1.5747e-02]])

In [21]:
out.angle()

tensor([[ 3.1416e+00,  3.1416e+00,  0.0000e+00,  ...,  0.0000e+00,
          0.0000e+00,  3.1416e+00],
        [-1.0090e-07,  6.2224e-01, -1.7372e-01,  ...,  2.9426e+00,
         -5.3591e-01, -9.3338e-01],
        [ 0.0000e+00,  2.7981e+00,  2.5569e+00,  ...,  1.2936e+00,
         -2.4746e+00,  1.8196e+00],
        ...,
        [-3.1416e+00,  1.7491e+00,  2.3669e+00,  ..., -2.8505e+00,
          1.1322e+00,  1.3693e+00],
        [ 0.0000e+00,  2.6795e+00, -6.5858e-01,  ...,  2.2578e-01,
          2.9048e+00, -2.0762e+00],
        [ 0.0000e+00,  3.1416e+00,  3.1416e+00,  ...,  0.0000e+00,
          0.0000e+00,  0.0000e+00]])

In [6]:
import torch
def pad_to_match(x, y):
        diff_h = y.size(2) - x.size(2)
        diff_w = y.size(3) - x.size(3)
        
        if diff_h > 0 or diff_w > 0:
            x = torch.nn.functional.pad(x, [diff_w // 2, diff_w - diff_w // 2,
                          diff_h // 2, diff_h - diff_h // 2])
        return x
x = torch.randn((1, 16, 32, 30))
y = torch.randn((1, 16, 32, 31))
# s = torch.nn.functional.pad(x, (0, 1, 0, 0))
x.shape

torch.Size([1, 16, 32, 30])

In [4]:
pad_to_match(x, y)

IndexError: Dimension out of range (expected to be in range of [-1, 0], but got 2)

In [17]:
from model import UNet
import torch
model = UNet()
model

UNet(
  (encoder1): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.01, inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.01, inplace=True)
  )
  (encoder2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.01, inplace=True)
    (2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.01, inplace=True)
  )
  (encoder3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.01, inplace=True)
    (2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.01, inplace=True)
  )
  (encoder4): Sequential(
    (0): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(ne

In [11]:
import librosa, torch
mix, _ = librosa.load("mix_36.wav", sr=16000)

In [12]:
x = torch.tensor(mix).unsqueeze(0)

In [20]:
#pass this mix to unet model
model(x)[0].shape

torch.Size([1, 1, 257, 251])

In [21]:
257*251

64507

In [22]:
x.shape[1] - 64507

-507

In [23]:
64000/257

249.0272373540856

In [4]:
#get the model ouput shape summary pytorch
from torchsummary import summary
from model import SpeechEnhacementModel
model = SpeechEnhacementModel()
summary(model.unet, (1, 64000))


RuntimeError: Failed to run torchsummary. See above stack traces for more details. Executed layers up to: [Sequential: 1-1, Conv2d: 2-1, LeakyReLU: 2-2, Conv2d: 2-3, LeakyReLU: 2-4]

In [15]:
model.stft(x)[0].shape

torch.Size([1, 257, 251])

In [16]:
257*251

64507