### 비교용 템플릿

먼저 메트릭을 뽑아내는 부분입니다


In [67]:
import os
from collections import defaultdict
from tqdm import tqdm
import time
import warnings
import json
import traceback

warnings.filterwarnings("ignore")

import numpy as np
from numpy.typing import NDArray
from scipy.io import wavfile

from pesq import pesq
from pystoi import stoi

from pathlib import Path
import wave


def prettier(obj):
    print(json.dumps(obj, indent=4, sort_keys=True))


def next_power_of_2(x):
    return 1 if x == 0 else 2 ** (x - 1).bit_length()


def result_to_metric(result):
    metric = defaultdict(float)

    prettier(result)
    metric["pesq_wb"] = result["pesq_wb"] / result["count"]
    metric["pesq_nb"] = result["pesq_nb"] / result["count"]
    metric["stoi"] = result["stoi"] / result["count"]
    metric["rtf"] = result["infer_time"] / result["length"]

    return metric


def save_enhanced_audio(rate, enhanced_audio, output_dir, filename):
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    # Convert floating point audio to int16
    if enhanced_audio.dtype != np.int16:
        enhanced_audio = np.int16(
            enhanced_audio / np.max(np.abs(enhanced_audio)) * 32767
        )

    # Save the enhanced audio using wave module
    output_path = os.path.join(output_dir, filename)
    wf = wave.open(output_path, "wb")
    wf.setnchannels(1)  # Assuming mono audio
    wf.setsampwidth(2)  # Assuming 16-bit audio
    wf.setframerate(rate)
    wf.writeframes(enhanced_audio.tostring())
    wf.close()


def eval_metric(
    infer, target_name, testset_path, load: bool, output_base_dir="enhanced_output"
):
    result = defaultdict(int)

    cleans = os.listdir(os.path.join(testset_path, "clean"))
    # noises = os.listdir(os.path.join(testset_path, "noisy"))

    output_dir = os.path.join(output_base_dir, target_name)

    for i in tqdm(range(len(cleans))):
        duration = 0
        try:
            rate, clean = wavfile.read(os.path.join(testset_path, "clean", cleans[i]))
            rate, noisy = wavfile.read(os.path.join(testset_path, "noisy", cleans[i]))
            if load:
                rate, target_wav = wavfile.read(
                    os.path.join(testset_path, target_name, cleans[i])
                )
            else:
                # As we infer on the CPU device, we don't need to sync with GPU.
                # So, we can utilize time.
                start_time = time.time()
                rate, target_wav = infer(rate, noisy, i)
                duration = time.time() - start_time

            # Save the enhanced audio
            save_enhanced_audio(rate, target_wav, output_dir, cleans[i])
        except Exception as e:
            traceback.print_exc()
            return
            continue

        n_samples = target_wav.shape[-1]
        length = n_samples / rate

        result["pesq_wb"] += (
            pesq(16000, clean, target_wav, "wb") * n_samples
        )  # wide band
        result["pesq_nb"] += (
            pesq(16000, clean, target_wav, "nb") * n_samples
        )  # narrow band
        result["stoi"] += stoi(clean, target_wav, rate) * n_samples
        result["count"] += 1 * n_samples
        result["length"] += length
        result["infer_time"] += duration

    if result["count"] is None:
        return None
    metric = result_to_metric(result)
    return metric

infer에 들어갈 spectral_subtraction 함수입니다.

메트릭을 구하기 위해 길이가 같아야 해서 코드를 적절히 변경하였습니다.

해당 코드를 비교해보시면 변경된 부분을 쉽게 구하실 수 있을 거에요.


In [68]:
def spectral_subtraction(rate: int, noisy: NDArray, target_name: str):
    fft = abs(np.fft.fft(noisy))
    len_ = 20 * rate // 1000  # frame size in samples
    PERC = 50  # window overlap in percent of frame
    len1 = len_ * PERC // 100  # overlap'length
    len2 = len_ - len1  # window'length - overlap'length

    # setting default parameters
    Thres = 3  # VAD threshold in dB SNRseg
    Expnt = 1.0  # exp(Expnt)
    G = 0.9

    # initial Hamming window
    win = np.hamming(len_)
    # normalization gain for overlap+add with 50% overlap
    winGain = len2 / sum(win)

    # nFFT = 2 * 2 ** (nextpow2.nextpow2(len_))
    nFFT = 2 * next_power_of_2(len_)
    noise_mean = np.zeros(nFFT)
    j = 1
    for k in range(1, 6):
        noise_mean = noise_mean + abs(np.fft.fft(win * noisy[j : j + len_], nFFT))
        j = j + len_
    noise_mu = noise_mean / 5

    # initialize various variables
    k = 1
    img = 1j
    x_old = np.zeros(len1)
    Nframes = len(noisy) // len2 - 1
    xfinal = np.zeros(noisy.shape[0])

    # === Start Processing === #
    for n in range(0, Nframes):
        # Windowing
        insign = win * noisy[k - 1 : k + len_ - 1]
        # compute fourier transform of a frame
        spec = np.fft.fft(insign, nFFT)
        # compute the magnitude
        sig = abs(spec)
        # save the noisy phase information
        theta = np.angle(spec)
        # SNR
        SNRseg = 10 * np.log10(
            np.linalg.norm(sig, 2) ** 2 / np.linalg.norm(noise_mu, 2) ** 2
        )

        # --- spectral subtraction --- #
        sub_speech = sig**Expnt - noise_mu**Expnt
        # the pure signal is less than the noise signal power
        diffw = sig**Expnt - noise_mu**Expnt

        # beta negative components
        def find_index(x_list):
            index_list = []
            for i in range(len(x_list)):
                if x_list[i] < 0:
                    index_list.append(i)
            return index_list

        z = find_index(diffw)
        if len(z) > 0:
            sub_speech[z] = 0

        # --- implement a simple VAD detector --- #
        if SNRseg < Thres:  # Update noise spectrum
            noise_temp = (
                G * noise_mu**Expnt + (1 - G) * sig**Expnt
            )  # Smoothing processing noise power spectrum
            noise_mu = noise_temp ** (1 / Expnt)  # New noise amplitude spectrum

        # add phase
        x_phase = (sub_speech ** (1 / Expnt)) * np.exp(img * theta)
        # take the IFFT
        xi = np.fft.ifft(x_phase).real

        # --- Overlap and add --- #
        xfinal[k - 1 : k + len2 - 1] = x_old + xi[0:len1]
        x_old = xi[0 + len1 : len_]

        k = k + len2

    xfinal[k - 1 : k + len2 - 1] = x_old

    return rate, winGain * xfinal.astype(noisy.dtype)

infer에 들어갈 MMSE 함수입니다.

메트릭을 구하기 위해 길이가 같아야 해서 코드를 적절히 변경하였습니다.

해당 코드를 비교해보시면 변경된 부분을 쉽게 구하실 수 있을 거에요.


In [69]:
import scipy.special as sp


def mmse(rate: int, noisy: NDArray, i: int):
    len_ = 20 * rate // 1000  # frame size in samples
    PERC = 50  # window overlap in percent of frame
    len1 = len_ * PERC // 100  # overlap'length
    len2 = len_ - len1  # window'length - overlop'length

    # setting default parameters
    aa = 0.98
    eta = 0.15
    Thres = 3
    mu = 0.98
    c = np.sqrt(np.pi) / 2
    ksi_min = 10 ** (-25 / 10)

    # hamming window
    win = np.hamming(len_)
    # normalization gain for overlap+add with 50% overlap
    winGain = len2 / sum(win)

    # setting initial noise
    nFFT = 2 * next_power_of_2(len_)
    j = 1
    noise_mean = np.zeros(nFFT)
    for k in range(1, 6):
        noise_mean = noise_mean + abs(np.fft.fft(win * noisy[j : j + len_], nFFT))
        j = j + len_
    noise_mu = noise_mean / 5
    noise_mu2 = noise_mu**2

    # initialize various variables
    k = 1
    img = 1j
    x_old = np.zeros(len2)
    Nframes = len(noisy) // len2 - 1
    xfinal = np.zeros(Nframes * len2)

    # === Start Processing ==== #
    for n in range(0, Nframes):

        # Windowing
        insign = win * noisy[k - 1 : k + len_ - 1]

        # Take fourier transform of frame
        spec = np.fft.fft(insign, nFFT)
        sig = abs(spec)
        sig2 = sig**2
        # save the noisy phase information
        theta = np.angle(spec)

        SNRpos = 10 * np.log10(
            np.linalg.norm(sig, 2) ** 2 / np.linalg.norm(noise_mu, 2) ** 2
        )

        # posteriori SNR
        gammak = np.minimum(sig2 / noise_mu2, 40)

        # decision-direct estimate of a priori SNR  P231 [7.75]
        if n == 0:
            ksi = aa + (1 - aa) * np.maximum(gammak - 1, 0)
        else:
            ksi = aa * Xk_prev / noise_mu2 + (1 - aa) * np.maximum(gammak - 1, 0)
            # limit ksi to -25 dB
            ksi = np.maximum(ksi_min, ksi)

        # --- implement a simple VAD detector --- #
        if SNRpos < Thres:  # Update noise spectrum
            noise_mu2 = (
                mu * noise_mu2 + (1 - mu) * sig2
            )  # Smoothing processing noise power spectrum
            noise_mu = np.sqrt(noise_mu2)

        # [7.40]
        vk = gammak * ksi / (1 + ksi)
        j_0 = sp.iv(
            0, vk / 2
        )  # modified bessel function of the first kind of real order
        j_1 = sp.iv(1, vk / 2)
        C = np.exp(-0.5 * vk)
        A = ((c * (vk**0.5)) * C) / gammak  # [7.40] A
        B = (1 + vk) * j_0 + vk * j_1  # [7.40] B
        hw = A * B  # [7.40]

        # get X(w)
        mmse_speech = hw * sig

        # save for estimation of a priori SNR in next frame
        Xk_prev = mmse_speech**2

        # IFFT
        x_phase = mmse_speech * np.exp(img * theta)
        xi_w = np.fft.ifft(x_phase, nFFT).real

        # overlap add
        xfinal[k - 1 : k + len2 - 1] = x_old + xi_w[0:len1]
        x_old = xi_w[len1 + 0 : len_]

        k = k + len2

    xfinal = winGain * xfinal.astype(noisy.dtype)

    # Overlap으로 인해 크기 차이 발생
    if len(xfinal) < len(noisy):
        xfinal = np.pad(xfinal, (0, len(noisy) - len(xfinal)), "constant")
    else:
        xfinal = xfinal[: len(noisy)]

    return rate, xfinal

infer에 들어갈 Wiener Filtering 함수입니다.

메트릭을 구하기 위해 길이가 같아야 해서 코드를 적절히 변경하였습니다.

해당 코드를 비교해보시면 변경된 부분을 쉽게 구하실 수 있을 거에요.


In [70]:
def wiener_filtering(rate: int, noisy: NDArray, i: int):
    len_ = 20 * rate // 1000  # frame size in samples
    PERC = 50  # window overlap in percent of frame
    len1 = len_ * PERC // 100  # overlap'length
    len2 = len_ - len1  # window'length - overlop'length

    # setting default parameters
    Thres = 3  # VAD threshold in dB SNRseg
    Expnt = 1.0
    G = 0.9

    # sine window
    i = np.linspace(0, len_ - 1, len_)
    win = np.sqrt(2 / (len_ + 1)) * np.sin(np.pi * (i + 1) / (len_ + 1))

    # normalization gain for overlap+add with 50% overlap
    winGain = len2 / sum(win)

    # setting initial noise
    nFFT = 2 * next_power_of_2(len_)
    j = 1
    noise_mean = np.zeros(nFFT)
    for k in range(1, 6):
        noise_mean = noise_mean + abs(np.fft.fft(win * noisy[j : j + len_], nFFT))
        j = j + len_
    noise_mu = noise_mean / 5

    # initialize various variables
    k = 1
    img = 1j
    x_old = np.zeros(len1)
    Nframes = len(noisy) // len2 - 1
    xfinal = np.zeros(Nframes * len2)

    # === Start Processing ==== #
    for n in range(0, Nframes):

        # Windowing
        insign = win * noisy[k - 1 : k + len_ - 1]
        # compute fourier transform of a frame
        spec = np.fft.fft(insign, nFFT)
        # compute the magnitude
        sig = abs(spec)
        # save the noisy phase information
        theta = np.angle(spec)
        # Posterior SNR
        SNRpos = 10 * np.log10(
            np.linalg.norm(sig, 2) ** 2 / np.linalg.norm(noise_mu, 2) ** 2
        )

        # --- wiener filtering --- #
        sub_speech = sig**Expnt - noise_mu**Expnt
        diffw = sig**Expnt - noise_mu**Expnt

        def find_index(x_list):
            index_list = []
            for i in range(len(x_list)):
                if x_list[i] < 0:
                    index_list.append(i)
            return index_list

        z = find_index(diffw)
        if len(z) > 0:
            sub_speech[z] = 0

        SNRpri = 10 * np.log10(
            np.linalg.norm(sub_speech, 2) ** 2 / np.linalg.norm(noise_mu, 2) ** 2
        )
        mel_max = 10
        mel_0 = (1 + 4 * mel_max) / 5
        s = 25 / (mel_max - 1)

        def get_mel(SNR):
            if -5.0 <= SNR <= 20.0:
                a = mel_0 - SNR / s
            else:
                if SNR < -5.0:
                    a = mel_max
                if SNR > 20:
                    a = 1
            return a

        mel = get_mel(SNRpri)
        G_k = sub_speech**2 / (sub_speech**2 + mel * noise_mu**2)
        wf_speech = G_k * sig

        if SNRpos < Thres:
            noise_temp = G * noise_mu**Expnt + (1 - G) * sig**Expnt
            noise_mu = noise_temp ** (1 / Expnt)

        x_phase = wf_speech * np.exp(img * theta)
        xi = np.fft.ifft(x_phase).real

        xfinal[k - 1 : k + len2 - 1] = x_old + xi[0:len1]
        x_old = xi[0 + len1 : len_]

        k = k + len2

    xfinal = winGain * xfinal.astype(noisy.dtype)

    # Overlap으로 인해 크기 차이 발생
    if len(xfinal) < len(noisy):
        xfinal = np.pad(xfinal, (0, len(noisy) - len(xfinal)), "constant")
    else:
        xfinal = xfinal[: len(noisy)]

    return rate, xfinal

메트릭을 측정합니다.


In [71]:
testset_path = "dataset/test"


def noisy(_rate, noisy, i):
    return _rate, noisy


targets = [
    {"name": "noisy", "infer": noisy, "rtf": 0, "load": True},
    {
        "name": "spectral_subtraction",
        "infer": spectral_subtraction,
        "rtf": None,
        "load": False,
    },
    {
        "name": "mmse",
        "infer": mmse,
        "rtf": None,
        "load": False,
    },
    {
        "name": "wiener_filtering",
        "infer": wiener_filtering,
        "rtf": None,
        "load": False,
    },
    {
        "name": "baseline",
        "infer": None,
        "rtf": 0,
        "load": True,
    },
]

metrics = {}
for target in targets:
    metric = eval_metric(target["infer"], target["name"], testset_path, target["load"])
    if target["rtf"] is not None:
        metric["rtf"] = target["rtf"]
    metrics[target["name"]] = metric
prettier(metrics)

100%|██████████| 412/412 [01:29<00:00,  4.63it/s]


{
    "count": 49972070,
    "infer_time": 0,
    "length": 1041.0847916666664,
    "pesq_nb": 128521217.5592233,
    "pesq_wb": 97583204.8157301,
    "stoi": 46353905.54622736
}


100%|██████████| 412/412 [02:02<00:00,  3.36it/s]


{
    "count": 49972070,
    "infer_time": 42.98717904090881,
    "length": 1041.0847916666664,
    "pesq_nb": 138189005.7308668,
    "pesq_wb": 107150051.56963086,
    "stoi": 46188702.048720405
}


100%|██████████| 412/412 [02:25<00:00,  2.84it/s]


{
    "count": 49972070,
    "infer_time": 64.70135641098022,
    "length": 1041.0847916666664,
    "pesq_nb": 130781402.69985962,
    "pesq_wb": 99690655.95318425,
    "stoi": 44975654.26802755
}


100%|██████████| 412/412 [02:01<00:00,  3.39it/s]


{
    "count": 49972070,
    "infer_time": 41.88861417770386,
    "length": 1041.0847916666664,
    "pesq_nb": 130986113.251472,
    "pesq_wb": 97041702.95830762,
    "stoi": 45550615.41517785
}


100%|██████████| 412/412 [01:26<00:00,  4.78it/s]

{
    "count": 49972070,
    "infer_time": 0,
    "length": 1041.0847916666664,
    "pesq_nb": 132173382.18717277,
    "pesq_wb": 102458708.39949071,
    "stoi": 46712186.653452545
}
{
    "baseline": {
        "pesq_nb": 2.644945110081947,
        "pesq_wb": 2.050319476449359,
        "rtf": 0,
        "stoi": 0.9347658932970466
    },
    "mmse": {
        "pesq_nb": 2.6170899604490994,
        "pesq_wb": 1.994927485557117,
        "rtf": 0.06214801803741673,
        "stoi": 0.9000158342055382
    },
    "noisy": {
        "pesq_nb": 2.5718609927350076,
        "pesq_wb": 1.9527549052046491,
        "rtf": 0,
        "stoi": 0.9275962661988458
    },
    "spectral_subtraction": {
        "pesq_nb": 2.7653248250646167,
        "pesq_wb": 2.14419878083159,
        "rtf": 0.041290756896074615,
        "stoi": 0.9242903495636744
    },
    "wiener_filtering": {
        "pesq_nb": 2.6211864597858763,
        "pesq_wb": 1.9419188150162205,
        "rtf": 0.04023554518613669,
        "stoi"




metrics를 표, 그래프 등으로 시각화합니다


In [None]:
import matplotlib.pyplot as plt
from collections import defaultdict
import numpy as np

import numpy as np
import matplotlib.pyplot as plt

# 주어진 데이터

    

# x 축 레이블
labels = ['pesq_nb', 'pesq_wb', 'rtf', 'stoi']

# 각 방법의 데이터를 리스트로 정리
noisy_data = [metrics['noisy'][label] for label in labels]
spectral_data = [metrics['spectral_subtraction'][label] for label in labels]
mmse_data = [metrics['mmse'][label] for label in labels]
wiener_data = [metrics['wiener_filtering'][label] for label in labels]

# 막대의 위치
x = np.arange(len(labels))

# 막대의 너비
width = 0.2

# 플롯 생성
fig, ax = plt.subplots(figsize=(12, 8))

# 각 데이터 방법에 대한 막대 그래프
rects1 = ax.bar(x - 1.5*width, noisy_data, width, label='noisy')
rects2 = ax.bar(x - 0.5*width, spectral_data, width, label='spectral_subtraction')
rects3 = ax.bar(x + 0.5*width, mmse_data, width, label='mmse')
rects4 = ax.bar(x + 1.5*width, wiener_data, width, label='wiener_filtering')

# x축 레이블 설정
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.set_ylabel('Values')
ax.set_title('Comparison of Different Denoising Methods')
ax.legend()

# 막대 위에 값 표시
def autolabel(rects):
    for rect in rects:
        height = rect.get_height()
        ax.annotate(f'{height:.2f}',
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3 points vertical offset
                    textcoords="offset points",
                    ha='center', va='bottom')

autolabel(rects1)
autolabel(rects2)
autolabel(rects3)
autolabel(rects4)

# 레이아웃 조정
fig.tight_layout()

# 그래프 표시
plt.show()