# 선언

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchaudio

from torch.utils.data import DataLoader, Dataset, ConcatDataset

import numpy as np
import matplotlib.pyplot as plt

import os

# 데이터 전처리

1. 소음 구간 태깅

2. 데이터 분할

In [None]:
class NoiseDataset(Dataset):
    def __init__(self, audio_file, sample_rate=16000, frame_size=1024, threshold=0.05):
        waveform, orig_sample_rate = torchaudio.load(audio_file)
        self.waveform = torchaudio.transforms.Resample(orig_freq=orig_sample_rate, new_freq=sample_rate)(waveform)
        self.sample_rate = sample_rate
        self.frame_size = frame_size
        self.threshold = threshold
        self.noise_indices = self._detect_noise(self.waveform)
        
    def _detect_noise(self, waveform):
        noise_indices = []
        energy = waveform.pow(2).mean(dim=0)
        for i in range(0, waveform.size(1) - self.frame_size, self.frame_size):
            frame_energy = energy[i:i+self.frame_size].mean().item()
            if frame_energy > self.threshold:
                noise_indices.append(i)
        return noise_indices

    def __len__(self):
        return len(self.noise_indices)
    
    def __getitem__(self, idx):
        start_idx = self.noise_indices[idx]
        end_idx = start_idx + self.frame_size
        noisy_segment = self.waveform[:, start_idx:end_idx]
        return noisy_segment, noisy_segment


# LSTM 모델

In [None]:
class AntiNoiseLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(AntiNoiseLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out)
        return out

# 학습 및 평가

In [None]:
def train_model(dataloader, model, criterion, optimizer, num_epochs = 10, validation_loader=None):
    train_losses = []
    val_losses = []

    for epoch in range(num_epochs):
        model.train() 
        running_loss = 0.0
        
        for i, (inputs, targets) in enumerate(dataloader):
            inputs = inputs.unsqueeze(-1).float()
            targets = targets.unsqueeze(-1).float()

            outputs = model(inputs)
            loss = criterion(outputs, targets)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            if (i + 1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(dataloader)}], Loss: {loss.item():.4f}')
        
        
        epoch_loss = running_loss / len(dataloader)
        train_losses.append(epoch_loss)

        if validation_loader is not None:
            val_loss = evaluate_model(validation_loader, model, criterion)
            val_losses.append(val_loss)
            print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {epoch_loss:.4f}, Validation Loss: {val_loss:.4f}')
        else:
            print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {epoch_loss:.4f}')
    
    return train_losses, val_losses

def evaluate_model(dataloader, model, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs = inputs.unsqueeze(-1).float()
            targets = targets.unsqueeze(-1).float()

            outputs = model(inputs)
            loss = criterion(outputs, targets)
            total_loss += loss.item()
    
    return total_loss / len(dataloader)

# 파라미터 선언

In [None]:
val = {
    'input_size' : 1,
    'hidden_size' : 128,
    'num_layers' : 2,
    'output_size' : 1,
    'learning_rate' : 0.001
}

# 데이터 불러오기

In [None]:
def load_datasets_from_folder(folder_path, sample_rate=16000, frame_size=1024, threshold=0.05):
    datasets = []
    for file_name in os.listdir(folder_path):
        if file_name.endswith(".wav"):
            audio_file_path = os.path.join(folder_path, file_name)
            dataset = NoiseDataset(audio_file_path, sample_rate, frame_size, threshold)
            datasets.append(dataset)
    combined_dataset = ConcatDataset(datasets)
    return combined_dataset

folder_path = 'your_folder_path'
combined_dataset = load_datasets_from_folder(folder_path)

dataloader = DataLoader(combined_dataset, batch_size = 64, shuffle=True)

# 모델, 손실 함수, 옵티마이저 초기화

In [None]:
anti_noise_model = AntiNoiseLSTM(val['input_size'], val['hidden_size'], val['num_layers'], val['output_size'])
criterion = nn.MSELoss()
optimizer = optim.Adam(anti_noise_model.parameters(), lr = val['learning_rate'])

# 학습

In [None]:
train_model(dataloader, anti_noise_model, criterion, optimizer)

# 테스트

In [None]:
def generate_and_plot_anti_noise(model, test_audio_file, sample_rate=16000, frame_size=1024):
    waveform, orig_sample_rate = torchaudio.load(test_audio_file)
    waveform = torchaudio.transforms.Resample(orig_freq=orig_sample_rate, new_freq=sample_rate)(waveform)
    
    model.eval()
    predicted_waveform = []

    with torch.no_grad():
        for i in range(0, waveform.size(1) - frame_size, frame_size):
            segment = waveform[:, i:i+frame_size]
            input_tensor = segment.unsqueeze(-1).float()
            predicted_anti_noise = model(input_tensor).squeeze(-1)
            predicted_waveform.append(predicted_anti_noise.squeeze().numpy())
    
    predicted_waveform = np.concatenate(predicted_waveform, axis=-1)
    
    plt.figure(figsize=(14, 6))
    plt.plot(waveform.squeeze().numpy(), label='Original Noise')
    plt.plot(predicted_waveform, label='Predicted Anti-Noise', color='orange', alpha=0.7)
    plt.title('Original Noise vs Predicted Anti-Noise')
    plt.legend()
    plt.show()


# 테스트 실행

In [None]:
test_audio_file = 'path'

generate_and_plot_anti_noise(anti_noise_model, test_audio_file)