Tiền xử lý trước khi đưa vào UNET

In [None]:
import os
import random
import numpy as np
import librosa
import torch
import kagglehub

print("Torch:", torch.__version__)
if torch.cuda.is_available():
    print("✅ GPU:", torch.cuda.get_device_name(0))
else:
    print("❌ CPU only")

Torch: 2.5.1+cu121
✅ GPU: NVIDIA GeForce RTX 4060 Laptop GPU


In [9]:
vivos_path = kagglehub.dataset_download(
    "kynthesis/vivos-vietnamese-speech-corpus-for-asr"
)

demand_path = kagglehub.dataset_download(
    "chrisfilo/demand"
)

print("VIVOS path:", vivos_path)
print("DEMAND path:", demand_path)


Downloading to C:\Users\quany\.cache\kagglehub\datasets\kynthesis\vivos-vietnamese-speech-corpus-for-asr\1.archive...


100%|██████████| 1.37G/1.37G [01:06<00:00, 22.2MB/s]

Extracting files...





Downloading to C:\Users\quany\.cache\kagglehub\datasets\chrisfilo\demand\1.archive...


100%|██████████| 6.87G/6.87G [05:25<00:00, 22.6MB/s]  

Extracting files...





VIVOS path: C:\Users\quany\.cache\kagglehub\datasets\kynthesis\vivos-vietnamese-speech-corpus-for-asr\versions\1
DEMAND path: C:\Users\quany\.cache\kagglehub\datasets\chrisfilo\demand\versions\1


In [10]:
def collect_wavs(root):
    return [
        os.path.join(r, f)
        for r, _, files in os.walk(root)
        for f in files if f.lower().endswith(".wav")
    ]

speech_files = collect_wavs(vivos_path)
noise_files  = collect_wavs(demand_path)

print("Speech files:", len(speech_files))
print("Noise files :", len(noise_files))

assert len(speech_files) > 0
assert len(noise_files) > 0


Speech files: 12420
Noise files : 560


In [11]:
SEGMENT_SEC = 0.3
SNR_MIN, SNR_MAX = 0, 20


def pad_to_length(x, target_len):
    if len(x) < target_len:
        x = np.pad(x, (0, target_len - len(x)))
    return x


def match_length(x, target_len):
    if len(x) < target_len:
        x = np.tile(x, int(np.ceil(target_len / len(x))))
    return x[:target_len]


def mix_with_snr(clean, noise, snr_db):
    clean_power = np.mean(clean ** 2)
    noise_power = np.mean(noise ** 2)

    noise = noise * np.sqrt(
        clean_power / (noise_power * 10 ** (snr_db / 10))
    )
    return clean + noise


In [12]:
def waveform_to_spectrogram(
    y,
    sr,
    n_fft=512,
    hop_length=256,
    target_shape=(256, 256)
):
    """
    Convert waveform -> log-magnitude spectrogram
    Output shape: (256, 256, 1)
    """
    stft = librosa.stft(y, n_fft=n_fft, hop_length=hop_length)
    mag = np.abs(stft)

    log_mag = np.log1p(mag)

    # normalize to [0, 1]
    log_mag = (log_mag - log_mag.min()) / (
        log_mag.max() - log_mag.min() + 1e-8
    )

    spec = np.zeros(target_shape, dtype=np.float32)
    h = min(log_mag.shape[0], target_shape[0])
    w = min(log_mag.shape[1], target_shape[1])
    spec[:h, :w] = log_mag[:h, :w]

    return spec[..., np.newaxis]  # (256,256,1)


In [13]:
def generate_one_sample():
    """
    Generate ONE training sample on-the-fly

    Returns
    -------
    noisy_spec : np.ndarray (256,256,1)
        Input for U-Net
    clean_spec : np.ndarray (256,256,1)
        Target for U-Net
    meta : dict
        Metadata for debugging / logging
    """

    # 1️⃣ Random speech file
    sp_path = random.choice(speech_files)
    speech, sr = librosa.load(sp_path, sr=None, mono=True)

    # 2️⃣ Random 0.3s segment
    seg_len = int(sr * SEGMENT_SEC)
    max_start = max(0, len(speech) - seg_len)
    start = random.randint(0, max_start)

    clean = speech[start:start + seg_len]
    clean = pad_to_length(clean, seg_len)

    # 3️⃣ Random noise file
    noise_path = random.choice(noise_files)
    noise, _ = librosa.load(noise_path, sr=sr, mono=True)
    noise = match_length(noise, len(clean))

    # 4️⃣ Random SNR
    snr_db = random.uniform(SNR_MIN, SNR_MAX)
    noisy = mix_with_snr(clean, noise, snr_db)

    # 5️⃣ Convert to spectrogram (model input)
    clean_spec = waveform_to_spectrogram(clean, sr)
    noisy_spec = waveform_to_spectrogram(noisy, sr)

    meta = {
        "speech_path": sp_path,
        "noise_path": noise_path,
        "sample_rate": sr,
        "segment_samples": seg_len,
        "segment_seconds": SEGMENT_SEC,
        "snr_db": snr_db,
        "waveform_rms_clean": float(np.sqrt(np.mean(clean ** 2))),
        "waveform_rms_noisy": float(np.sqrt(np.mean(noisy ** 2))),
    }

    return noisy_spec, clean_spec, meta


In [14]:
noisy_spec, clean_spec, meta = generate_one_sample()

print("=== SAMPLE METADATA ===")
for k, v in meta.items():
    print(f"{k}: {v}")

print("\n=== TENSOR SHAPES ===")
print("Noisy spec shape :", noisy_spec.shape)
print("Clean spec shape :", clean_spec.shape)

print("\n=== VALUE RANGE ===")
print("Noisy  min/max:", noisy_spec.min(), noisy_spec.max())
print("Clean  min/max:", clean_spec.min(), clean_spec.max())


=== SAMPLE METADATA ===
speech_path: C:\Users\quany\.cache\kagglehub\datasets\kynthesis\vivos-vietnamese-speech-corpus-for-asr\versions\1\vivos\train\waves\VIVOSSPK01\VIVOSSPK01_R122.wav
noise_path: C:\Users\quany\.cache\kagglehub\datasets\chrisfilo\demand\versions\1\OHALLWAY_16k\OHALLWAY\ch06.wav
sample_rate: 16000
segment_samples: 4800
segment_seconds: 0.3
snr_db: 0.36762275060037153
waveform_rms_clean: 0.050982117652893066
waveform_rms_noisy: 0.07055313885211945

=== TENSOR SHAPES ===
Noisy spec shape : (256, 256, 1)
Clean spec shape : (256, 256, 1)

=== VALUE RANGE ===
Noisy  min/max: 0.0 1.0
Clean  min/max: 0.0 1.0
