### Import Packages

In [163]:
import os
import librosa
import librosa.display
import matplotlib.pyplot as plt
import soundfile

from sklearn.model_selection import train_test_split

import torch
import torchaudio
import torch.nn as nn
import torch.nn.functional as F
import torchaudio.transforms as T
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import numpy as np
import torchsummary

### Global attributes

In [164]:
mixed_dir = "../mixed_data/"
clean_dir = "../clean_data/"
nature_mixed_dir = "../classified_sound_1115/nature/mixed/"
nature_clean_dir = "../classified_sound_1115/nature/clean/"

### Preprocess Data

In [165]:
# MelSpectrogram參數
n_mels = 128                # 保持 Mel 頻譜圖的解析度
n_fft = 1024                # 提高 FFT 窗口大小以適配更多信號頻率
hop_length = 512            # 保持 hop_length 為 n_fft 的一半
win_length = 1024           # 窗口大小與 n_fft 保持一致（或設為 None 使用默認值）
sample_rate = 16000         # 採樣率保持不變，適合語音處理
f_max = sample_rate // 2    # 預設為 Nyquist 頻率，即 8000 Hz
duration = 5                # 音頻時長為 5 秒

In [166]:
import os
import torch
import librosa
import numpy as np
import soundfile as sf
from alive_progress import alive_bar
import time

# def save_spectrogram_as_npy(spectrogram, save_path):
#     """Save mel spectrogram as a NumPy array."""
#     np.save(save_path, spectrogram)  # Save as .npy file

# def sound_to_spectrogram(mixed_dir, clean_dir, sample_rate, duration, n_mels):
    
#     length = len([f for f in os.listdir(mixed_dir) if f != ".gitkeep"])
    
#     # print(f"Loading {length} files...")

#     with alive_bar(length, force_tty=True) as bar:
#         for filename in sorted(os.listdir(mixed_dir)):
#             if ".gitkeep" in filename:
#                 continue
#             try:
#                 # 使用完整路徑
#                 mixed_path = os.path.join(mixed_dir, filename)
#                 clean_path = os.path.join(clean_dir, filename)
                
#                 # 使用 soundfile 替代 librosa.load
#                 mixed_waveform, sr = sf.read(mixed_path)
#                 clean_waveform, sr = sf.read(clean_path)
                
#                 # 如果採樣率不匹配，進行重採樣
#                 if sr != sample_rate:
#                     mixed_waveform = librosa.resample(mixed_waveform, orig_sr=sr, target_sr=sample_rate)
#                     clean_waveform = librosa.resample(clean_waveform, orig_sr=sr, target_sr=sample_rate)
                
#                 # 如果指定了持續時間，裁剪音頻
#                 if duration:
#                     samples = int(duration * sample_rate)
#                     mixed_waveform = mixed_waveform[:samples]
#                     clean_waveform = clean_waveform[:samples]
                
#                 # 生成梅爾頻譜圖
#                 mixed_mel_spectrogram = librosa.feature.melspectrogram(
#                     y=mixed_waveform,
#                     sr=sample_rate,
#                     n_fft=n_fft,
#                     hop_length=hop_length,
#                     n_mels=n_mels
#                 )
#                 clean_mel_spectrogram = librosa.feature.melspectrogram(
#                     y=clean_waveform,
#                     sr=sample_rate,
#                     n_fft=n_fft,
#                     hop_length=hop_length,
#                     n_mels=n_mels
#                 )

#                 # 轉換為分貝刻度
#                 mixed_mel_spectrogram_db = librosa.power_to_db(
#                     mixed_mel_spectrogram, 
#                     ref=np.max, 
#                     amin=1e-10  # 避免log(0)
#                 )
#                 clean_mel_spectrogram_db = librosa.power_to_db(
#                     clean_mel_spectrogram, 
#                     ref=np.max, 
#                     amin=1e-10
#                 )
                
#                 # Save spectrograms as .npy files
#                 mixed_npy_path = os.path.join("../nature/mixed/", f"{filename[-4]}.npy")
#                 clean_npy_path = os.path.join("../nature/clean/", f"{filename[-4]}.npy")

#                 save_spectrogram_as_npy(mixed_mel_spectrogram_db, mixed_npy_path)
#                 save_spectrogram_as_npy(clean_mel_spectrogram_db, clean_npy_path)
                
#                 bar()
                
#             except Exception as e:
#                 print(f"Error processing file {filename}: {str(e)}")
#                 continue
            
# sound_to_spectrogram(nature_mixed_dir, nature_clean_dir, sample_rate, duration, n_mels)


In [167]:
def load_spectrogram_from_npy(mixed_dir, clean_dir):
    """Load mel spectrogram from a NumPy file."""
    mixed_mel_spectrograms = []
    clean_mel_spectrograms = []
    
    length = len(os.listdir(clean_dir))
    
    # print(f"Loading {length} files...")

    with alive_bar(length, force_tty=True) as bar:
        for filename in sorted(os.listdir(clean_dir)):
            if ".gitkeep" in filename:
                continue
            try:
                # 使用完整路徑
                mixed_path = os.path.join(mixed_dir, filename)
                clean_path = os.path.join(clean_dir, filename)
                
                mixed_mel_spectrogram_db = np.load(mixed_path)
                clean_mel_spectrogram_db = np.load(clean_path)

                # # 轉換為 PyTorch tensor 並添加通道維度
                mixed_mel_tensor = torch.tensor(mixed_mel_spectrogram_db, dtype=torch.float32).squeeze().unsqueeze(0)
                clean_mel_tensor = torch.tensor(clean_mel_spectrogram_db, dtype=torch.float32).squeeze().unsqueeze(0)
                print(mixed_mel_tensor.shape)
                print(clean_mel_tensor.shape)
                mixed_mel_spectrograms.append(mixed_mel_tensor)
                clean_mel_spectrograms.append(clean_mel_tensor)
                
                bar()
                
            except Exception as e:
                print(f"Error load file {filename}: {str(e)}")
                continue

    return mixed_mel_spectrograms, clean_mel_spectrograms

In [168]:
mixed_mel_spectrograms, clean_mel_spectrograms = load_spectrogram_from_npy("../nature/mixed/", "../nature/clean/")
mixed_mel_spectrograms_train, mixed_mel_spectrograms_val, clean_mel_spectrograms_train, clean_mel_spectrograms_val = train_test_split(mixed_mel_spectrograms, clean_mel_spectrograms, test_size=0.2, random_state=42)
time_steps = mixed_mel_spectrograms[0].shape[2]

on 0: torch.Size([1, 80000, 128])                                               
on 0: torch.Size([1, 128, 157])                                                 
on 1: torch.Size([1, 80000, 128])                                               
on 1: torch.Size([1, 128, 157])                                                 
on 2: torch.Size([1, 80000, 128])                                               
on 2: torch.Size([1, 128, 157])                                                 
on 3: torch.Size([1, 80000, 128])                                               
on 3: torch.Size([1, 128, 157])                                                 
on 4: torch.Size([1, 80000, 128])                                               
on 4: torch.Size([1, 128, 157])                                                 
on 5: torch.Size([1, 80000, 128])                                               
on 5: torch.Size([1, 128, 157])                                                 
on 6: torch.Size([1, 80000, 

KeyboardInterrupt: 

### Training

In [None]:
class DenoiseAutoencoder(nn.Module):
    def __init__(self):
        super(DenoiseAutoencoder, self).__init__()
        # 編碼器
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
        )
        # 解碼器
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(1),
        )
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)

        # Crop to match the exact target size if needed
        if x.size(-1) > time_steps:
            x = x[..., :time_steps]  # Crop time steps to match target (batch_size, 1, 128, 54)

        return x

In [None]:
# Model參數
num_epochs = 1000
batch_size = 32
learning_rate = 0.1
lr_decay_step = 40
lr_decay_gamma = 0.9

In [None]:
class AudioDataset(Dataset):
    def __init__(self, mixed_data, clean_data):
        self.mixed = mixed_data
        self.clean = clean_data
    
    def __len__(self):
        return len(self.mixed)
    
    def __getitem__(self, idx):
        return self.mixed[idx], self.clean[idx]

dataset = AudioDataset(mixed_mel_spectrograms_train, clean_mel_spectrograms_train)
dataloader = DataLoader(dataset, batch_size=batch_size)


In [None]:
from torch.optim.lr_scheduler import StepLR
model = DenoiseAutoencoder()
# example_input = torch.randn(1, 1, 128, time_steps)  # Batch size = 1, Channels = 1
# output = model(example_input)
# print("Output shape:", output.shape)  # Should be (1, 1, 128, time_steps)

torchsummary.summary(model,(1,64,44))
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size=lr_decay_step, gamma=lr_decay_gamma)

# 訓練過程
for epoch in range(num_epochs):
    model.train()
    for (mixed, clean) in dataloader:
        
        optimizer.zero_grad()

        # 前向傳播
        outputs = model(mixed)
        loss = criterion(outputs, clean)
            
        # 反向傳播和優化
        loss.backward()
        optimizer.step()
        
    # Step the scheduler to decay the learning rate
    scheduler.step()
    
    # Optionally, print the current learning rate and loss
    current_lr = scheduler.get_last_lr()[0]
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.12f}, Learning Rate: {current_lr:.8f}")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 16, 32, 22]             160
       BatchNorm2d-2           [-1, 16, 32, 22]              32
              ReLU-3           [-1, 16, 32, 22]               0
            Conv2d-4           [-1, 32, 16, 11]           4,640
       BatchNorm2d-5           [-1, 32, 16, 11]              64
              ReLU-6           [-1, 32, 16, 11]               0
            Conv2d-7             [-1, 64, 8, 6]          18,496
       BatchNorm2d-8             [-1, 64, 8, 6]             128
              ReLU-9             [-1, 64, 8, 6]               0
           Conv2d-10            [-1, 128, 4, 3]          73,856
      BatchNorm2d-11            [-1, 128, 4, 3]             256
             ReLU-12            [-1, 128, 4, 3]               0
  ConvTranspose2d-13             [-1, 64, 8, 6]          73,792
      BatchNorm2d-14             [-1, 6

  return F.mse_loss(input, target, reduction=self.reduction)


RuntimeError: The size of tensor a (128) must match the size of tensor b (157) at non-singleton dimension 3

### Prediction

In [None]:
testfilename = "1034-121119-0049.wav"

In [None]:
# CLEAN
waveform, sample_rate = librosa.load(''.join([clean_dir, testfilename]))

clean_mel = clean_mel_spectrograms[0]
clean_output = clean_mel.squeeze(0).squeeze(0).detach().numpy()
clean_output = librosa.db_to_power(clean_output)

audio_signal = librosa.feature.inverse.mel_to_audio(clean_output, sr=sample_rate, n_iter=500)
audio_signal = audio_signal / np.max(np.abs(audio_signal))


librosa.display.waveshow(audio_signal, sr=sample_rate)
soundfile.write('test_librosa_clean.wav', audio_signal, sample_rate)

In [None]:
# MIXED
waveform, sample_rate = librosa.load(''.join([mixed_dir, testfilename]))

mixed_mel = mixed_mel_spectrograms[0]
mixed_output = mixed_mel.squeeze(0).squeeze(0).detach().numpy()
mixed_output = librosa.db_to_power(mixed_output)

audio_signal = librosa.feature.inverse.mel_to_audio(mixed_output, sr=sample_rate, n_iter=500)
audio_signal = audio_signal / np.max(np.abs(audio_signal))


librosa.display.waveshow(audio_signal, sr=sample_rate)
soundfile.write('test_librosa_mixed.wav', audio_signal, sample_rate)

In [None]:
# DENOISED
model.eval()  # 设置模型为评估模式

denoised_output = model(mixed_mel_spectrograms[0].unsqueeze(0))
denoised_output = denoised_output.squeeze(0).squeeze(0).detach().numpy()
denoised_output = librosa.db_to_power(denoised_output)

audio_signal = librosa.feature.inverse.mel_to_audio(denoised_output, sr=sample_rate, n_iter=500)
audio_signal = audio_signal / np.max(np.abs(audio_signal))

librosa.display.waveshow(audio_signal, sr=sample_rate)
soundfile.write('test_librosa_denoised.wav', audio_signal, sample_rate)
