# 개인 KWS(+화자게이트) 학습 노트북 — v1.1 (환경 패치판)

**중요:** Colab의 NumPy/바이너리 불일치 오류를 피하기 위해 첫 셀을 반드시 실행하세요. 첫 셀은 재시작을 트리거합니다. 재시작 후, 위에서부터 다시 실행하세요.


In [None]:

# ================================================================
# 0) Environment Repair (필수) — NumPy/Scipy 핀 고정 + 런타임 재시작
# ================================================================
import sys, subprocess, os, time

def sh(cmd):
    print(">", " ".join(cmd))
    subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# 최신 빌드 휠 호환을 위해 빌드 툴 업데이트
sh([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "pip", "setuptools", "wheel"])

# 핵심 과학 스택 핀 고정 (Colab 안정 조합)
# - numpy==1.26.4, scipy==1.11.4 은 다양한 오디오/ML 휠과 호환성이 좋음
sh([sys.executable, "-m", "pip", "install", "-q", "numpy==1.26.4", "scipy==1.11.4"])

print("\n환경 정렬이 완료되었습니다. 런타임을 재시작합니다...")
# Colab 런타임 강제 재시작
os.kill(os.getpid(), 9)


> /usr/bin/python3 -m pip install -q --upgrade pip setuptools wheel
> /usr/bin/python3 -m pip install -q numpy==1.26.4 scipy==1.11.4


In [None]:

# ================================================================
# 1) 패키지 설치 (안정 버전 고정) — 재시작 후 실행
# ================================================================
import sys, subprocess, warnings
warnings.filterwarnings("ignore")

def pip_install(pkgs):
    try:
        print("Installing:", pkgs)
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q"] + pkgs)
    except Exception as e:
        print("설치 중 경고:", e)

# 중요: numpy/scipy는 0번 셀에서 이미 고정했으므로 재설치하지 않음
pip_install([
    "soundfile==0.12.1",
    "librosa==0.10.2.post1",
    "tqdm==4.66.4",
    "matplotlib==3.8.4",
    "scikit-learn==1.3.2",
    "onnx==1.15.0",
    "onnxruntime==1.17.3",
    "speechbrain==0.5.16",
    "torchmetrics==1.4.0.post0",
    "pydub==0.25.1",
    "jiwer==3.0.4"
])

# 선택 TTS (실패해도 전체 파이프는 진행됨)
pip_install(["TTS==0.22.0"])
pip_install(["gTTS==2.5.1"])

try:
    from google.colab import files
    IN_COLAB = True
except Exception:
    IN_COLAB = False

print("IN_COLAB:", IN_COLAB)


Installing: ['soundfile==0.12.1', 'librosa==0.10.2.post1', 'tqdm==4.66.4', 'matplotlib==3.8.4', 'scikit-learn==1.3.2', 'onnx==1.15.0', 'onnxruntime==1.17.3', 'speechbrain==0.5.16', 'torchmetrics==1.4.0.post0', 'pydub==0.25.1', 'jiwer==3.0.4']
Installing: ['TTS==0.22.0']
Installing: ['gTTS==2.5.1']
IN_COLAB: True


In [None]:

# ================================================================
# 2) 설정값
# ================================================================
import os, math, json, random, shutil, time, glob, io
from pathlib import Path
import numpy as np
import soundfile as sf
import librosa
from tqdm import tqdm

SEED = 42
random.seed(SEED); np.random.seed(SEED)

# 오디오/피처 파라미터 (런타임과 동일)
SR = 16000
CLIP_SEC = 1.28
N_FFT = 512
WIN = 400
HOP = 160
N_MELS = 40
FMIN = 20.0
FMAX = 7600.0

# 학습 파라미터
EPOCHS = 25
BATCH_SIZE = 64
LR = 1e-3
WEIGHT_DECAY = 1e-5
PATIENCE = 6

# 데이터 경로(Colab 기준)
BASE = Path("/content")
DATA = BASE/"data"
USER_POS = DATA/"user_pos"     # 사용자 "로키야" wav
OTHER_SPK = DATA/"other_spk"   # 타인 음성(문구 무관)
NOISE_DIR = DATA/"noise"       # 배경소음 wav (3초 조각 + 1분)
TTS_NEG = DATA/"tts_neg"       # TTS 하드 네거티브("로키야" 등), 비사용시 비어있어도 OK

# 산출물
ARTIFACTS = BASE/"artifacts"
ARTIFACTS.mkdir(parents=True, exist_ok=True)

# 로깅 편의
def say(*a): print(*a)


In [None]:

# ================================================================
# 3) 데이터 폴더 준비/업로드
# ================================================================
from pathlib import Path
from zipfile import ZipFile

for d in [USER_POS, OTHER_SPK, NOISE_DIR, TTS_NEG]:
    d.mkdir(parents=True, exist_ok=True)

def count_wavs(p: Path):
    return len(list(p.glob("*.wav")))

say("현재 wav 개수:",
    "\n user_pos:", count_wavs(USER_POS),
    "\n other_spk:", count_wavs(OTHER_SPK),
    "\n noise:", count_wavs(NOISE_DIR),
    "\n tts_neg:", count_wavs(TTS_NEG))

def auto_extract_first_zip_to_data():
    zips = list(Path("/content").glob("*.zip"))
    if not zips: return False
    try:
        with ZipFile(str(zips[0]), 'r') as zf:
            zf.extractall(str(DATA))
        say(f"압축 해제 완료: {zips[0].name}")
        return True
    except Exception as e:
        say("압축 해제 스킵:", e)
        return False

if IN_COLAB and sum(count_wavs(p) for p in [USER_POS, OTHER_SPK, NOISE_DIR]) < 10:
    say("\n데이터가 거의 비어있습니다. 다음 중 하나를 수행하세요:")
    say("  1) 'dataset.zip' 업로드 (data/ 내부 구조 포함)")
    say("  2) 각 폴더에 개별 wav 업로드")
    try:
        uploaded = files.upload()
        auto_extract_first_zip_to_data()
    except Exception as e:
        say("업로드 위젯 사용 불가:", e)

# 더미 데이터 (없을 때만)
if sum(count_wavs(p) for p in [USER_POS, OTHER_SPK, NOISE_DIR]) == 0:
    say("\n실데이터 없음 → 더미 데이터 생성")
    import numpy as np, soundfile as sf, math
    NOISE_DIR.mkdir(exist_ok=True, parents=True)
    for i in range(20):
        x = np.random.randn(int(SR*3.0))*0.01
        sf.write(NOISE_DIR/f"noise_{i:03d}.wav", x, SR, subtype="PCM_16")
    for i in range(30):
        t = np.linspace(0, CLIP_SEC, int(SR*CLIP_SEC), endpoint=False)
        y = 0.02*np.sin(2*np.pi*440*t) + 0.006*np.random.randn(t.size)
        sf.write(USER_POS/f"pos_{i:03d}.wav", y, SR, subtype="PCM_16")
    for i in range(12):
        t = np.linspace(0, CLIP_SEC, int(SR*CLIP_SEC), endpoint=False)
        y = 0.02*np.sin(2*np.pi*520*t) + 0.01*np.random.randn(t.size)
        sf.write(OTHER_SPK/f"neg_{i:03d}.wav", y, SR, subtype="PCM_16")

say("\n최종 wav 개수:",
    "\n user_pos:", count_wavs(USER_POS),
    "\n other_spk:", count_wavs(OTHER_SPK),
    "\n noise:", count_wavs(NOISE_DIR),
    "\n tts_neg:", count_wavs(TTS_NEG))


현재 wav 개수: 
 user_pos: 0 
 other_spk: 0 
 noise: 0 
 tts_neg: 0

데이터가 거의 비어있습니다. 다음 중 하나를 수행하세요:
  1) 'dataset.zip' 업로드 (data/ 내부 구조 포함)
  2) 각 폴더에 개별 wav 업로드


KeyboardInterrupt: 

In [None]:

# ================================================================
# 4) (옵션) TTS 하드 네거티브
# ================================================================
phrases = [
    "로키야", "헤이 로키", "로키", "로키야?", "로키야아", "록이야", "로기야", "로키야 지금",
    "로키야 듣고 있어?", "헤이 로키 지금 시간", "로키야 뭐해"
]
K_TTS_WAVS = 16
TTS_NEG.mkdir(parents=True, exist_ok=True)

def save_wav(path, y, sr=SR):
    import soundfile as sf
    sf.write(path, y, sr, subtype="PCM_16")

generated = 0
try:
    from TTS.api import TTS
    say("Coqui TTS 시도...")
    tts = TTS("tts_models/ko/kss/tacotron-DDC")
    tts_out = TTS_NEG/"coqui"; tts_out.mkdir(exist_ok=True)
    while generated < K_TTS_WAVS:
        txt = random.choice(phrases)
        fn = tts_out/f"tts_{generated:03d}.wav"
        try:
            tts.tts_to_file(text=txt, file_path=str(fn))
            y, sr = librosa.load(str(fn), sr=SR, mono=True)
            save_wav(fn, y, SR)
            generated += 1
        except Exception as e:
            say("Coqui 합성 스킵:", e)
            break
except Exception as e:
    say("Coqui TTS 사용 불가:", e)

if generated < K_TTS_WAVS:
    try:
        from gtts import gTTS
        from pydub import AudioSegment
        say("gTTS 시도...")
        gtts_out = TTS_NEG/"gtts"; gtts_out.mkdir(exist_ok=True)
        for i in range(generated, K_TTS_WAVS):
            txt = random.choice(phrases)
            mp3_path = gtts_out/f"tts_{i:03d}.mp3"
            wav_path = gtts_out/f"tts_{i:03d}.wav"
            try:
                gTTS(txt, lang="ko").save(str(mp3_path))
                audio = AudioSegment.from_file(str(mp3_path), format="mp3")
                audio = audio.set_frame_rate(SR).set_channels(1)
                audio.export(str(wav_path), format="wav")
                y, sr = librosa.load(str(wav_path), sr=SR, mono=True)
                save_wav(wav_path, y, SR)
                generated += 1
            except Exception as e:
                say("gTTS 합성 스킵:", e)
                break
    except Exception as e:
        say("gTTS 사용 불가:", e)

say(f"TTS 하드네거티브 생성 개수: {generated}")


Coqui TTS 시도...
Coqui TTS 사용 불가: 'ko'
gTTS 시도...
TTS 하드네거티브 생성 개수: 16


In [None]:

# ================================================================
# 5) 피처(Log-Mel) — 런타임 동일
# ================================================================
import numpy as np

def hz_to_mel(hz: float) -> float:
    return 2595.0 * np.log10(1.0 + hz / 700.0)

def mel_to_hz(mel: float) -> float:
    return 700.0 * (10.0**(mel / 2595.0) - 1.0)

def build_mel_filter(sr=SR, n_fft=N_FFT, n_mels=N_MELS, fmin=FMIN, fmax=FMAX):
    m_min = hz_to_mel(fmin)
    m_max = hz_to_mel(fmax)
    m_pts = np.linspace(m_min, m_max, n_mels + 2)
    f_pts = mel_to_hz(m_pts)
    fft_bins = np.floor((n_fft + 1) * f_pts / sr).astype(int)
    fb = np.zeros((n_mels, n_fft // 2 + 1), dtype=np.float32)
    for m in range(1, n_mels + 1):
        f_m_minus, f_m, f_m_plus = fft_bins[m - 1], fft_bins[m], fft_bins[m + 1]
        if f_m_minus == f_m: f_m -= 1
        if f_m == f_m_plus: f_m_plus += 1
        for k in range(max(f_m_minus, 0), min(f_m, n_fft // 2) + 1):
            fb[m - 1, k] = (k - f_m_minus) / float(max(f_m - f_m_minus, 1))
        for k in range(max(f_m, 0), min(f_m_plus, n_fft // 2) + 1):
            fb[m - 1, k] = (f_m_plus - k) / float(max(f_m_plus - f_m, 1))
    return fb

MEL_FB = build_mel_filter()

def pad_or_trim(y: np.ndarray, target_len: int) -> np.ndarray:
    if y.size < target_len:
        return np.pad(y, (0, target_len - y.size))
    return y[:target_len]

def logmel_from_float32(y: np.ndarray, clip_sec=CLIP_SEC):
    y = y.astype(np.float32)
    target_len = int(SR * clip_sec)
    y = pad_or_trim(y, target_len)
    pad = N_FFT // 2
    ypad = np.pad(y, (pad, pad), mode='constant')
    win_buf = np.zeros(N_FFT, dtype=np.float32)
    win_buf[:WIN] = np.hanning(WIN).astype(np.float32)
    frames = 1 + (ypad.size - N_FFT) // HOP
    if frames < 1: frames = 1
    S = np.empty((N_FFT // 2 + 1, frames), dtype=np.float32)
    for i in range(frames):
        s = i * HOP
        frame = ypad[s:s + N_FFT]
        if frame.size < N_FFT:
            frame = np.pad(frame, (0, N_FFT - frame.size))
        frame = frame * win_buf
        spec = np.fft.rfft(frame, n=N_FFT)
        S[:, i] = (spec.real**2 + spec.imag**2)
    M = np.dot(MEL_FB, S)
    M = np.maximum(M, 1e-10)
    M_db = 10.0 * np.log10(M)
    M_db = (M_db - M_db.mean()) / (M_db.std() + 1e-6)
    return M_db[np.newaxis, np.newaxis, :, :].astype(np.float32)


In [None]:

# ================================================================
# 6) 데이터 증강
# ================================================================
import numpy as np, glob, random, librosa

def rand_gain(y, low_db=-6, high_db=6, p=0.7):
    if np.random.rand() > p: return y
    gain = 10 ** (np.random.uniform(low_db, high_db) / 20.0)
    return (y * gain).astype(np.float32)

def rand_shift(y, max_ms=80, p=0.7):
    if np.random.rand() > p: return y
    max_shift = int(SR * max_ms / 1000.0)
    s = np.random.randint(-max_shift, max_shift+1)
    if s >= 0:
        y2 = np.pad(y, (s, 0))[:y.size]
    else:
        y2 = np.pad(y, (0, -s))[-(y.size):]
    return y2.astype(np.float32)

def rand_speed(y, low=0.90, high=1.10, p=0.5):
    if np.random.rand() > p: return y
    rate = np.random.uniform(low, high)
    y2 = librosa.effects.time_stretch(y=y.astype(np.float32), rate=rate)
    target_len = y.size
    if y2.size < target_len:
        y2 = np.pad(y2, (0, target_len - y2.size))
    else:
        y2 = y2[:target_len]
    return y2.astype(np.float32)

def rand_pitch(y, steps_low=-1.0, steps_high=1.0, p=0.5):
    if np.random.rand() > p: return y
    steps = np.random.uniform(steps_low, steps_high)
    y2 = librosa.effects.pitch_shift(y.astype(np.float32), sr=SR, n_steps=steps)
    y2 = y2[:y.size]
    if y2.size < y.size:
        y2 = np.pad(y2, (0, y.size-y2.size))
    return y2.astype(np.float32)

def rand_reverb(y, p=0.3):
    if np.random.rand() > p: return y
    ir_len = int(SR*0.08)
    t = np.linspace(0, 1, ir_len)
    ir = np.exp(-6*t).astype(np.float32)
    ir /= (np.sum(ir)+1e-6)
    y2 = np.convolve(y.astype(np.float32), ir, mode="full")[:y.size]
    return y2.astype(np.float32)

NOISE_POOL = []
for wav in sorted(glob.glob(str(NOISE_DIR/"*.wav"))):
    try:
        y, sr = librosa.load(wav, sr=SR, mono=True)
        NOISE_POOL.append(y.astype(np.float32))
    except Exception:
        pass

def rand_noise_mix(y, snr_db_low=0, snr_db_high=20, p=0.7):
    if not NOISE_POOL or np.random.rand() > p: return y
    n = random.choice(NOISE_POOL)
    if n.size < y.size:
        k = int(np.ceil(y.size / n.size))
        n = np.tile(n, k)[:y.size]
    else:
        s = np.random.randint(0, n.size - y.size + 1)
        n = n[s:s+y.size]
    sig_pow = np.mean(y**2) + 1e-9
    noise_pow = np.mean(n**2) + 1e-9
    snr_db = np.random.uniform(snr_db_low, snr_db_high)
    target_noise_pow = sig_pow / (10**(snr_db/10.0))
    n = n * np.sqrt(target_noise_pow / noise_pow)
    y2 = y + n
    y2 = np.clip(y2, -1.0, 1.0).astype(np.float32)
    return y2

def augment(y):
    y = rand_gain(y)
    y = rand_shift(y)
    y = rand_speed(y)
    y = rand_pitch(y)
    y = rand_reverb(y)
    y = rand_noise_mix(y)
    return y.astype(np.float32)


In [None]:

# ================================================================
# 7) 데이터셋/로더
# ================================================================
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import glob

def load_wav_mono(path, sr=SR):
    y, rsr = librosa.load(str(path), sr=sr, mono=True)
    if np.max(np.abs(y)) > 0:
        y = y / max(1.0, np.max(np.abs(y)))
    return y.astype(np.float32)

pos_files = sorted(glob.glob(str(USER_POS/"*.wav")))
neg_files = sorted(glob.glob(str(OTHER_SPK/"*.wav"))) + sorted(glob.glob(str(TTS_NEG/"**/*.wav"), recursive=True))
for nwav in sorted(glob.glob(str(NOISE_DIR/"*.wav"))):
    neg_files.append(nwav)

say(f"양성 {len(pos_files)} | 음성 {len(neg_files)}")

class KWSDataset2(Dataset):
    def __init__(self, pairs, augment_on=False):
        self.pairs = pairs
        self.augment_on = augment_on
    def __len__(self): return len(self.pairs)
    def __getitem__(self, idx):
        path, label = self.pairs[idx]
        y = load_wav_mono(path)
        target_len = int(SR*CLIP_SEC)
        if y.size < target_len:
            y = np.pad(y, (0, target_len - y.size))
        elif y.size > target_len:
            s = np.random.randint(0, y.size - target_len + 1) if self.augment_on else (y.size - target_len)//2
            y = y[s:s+target_len]
        if self.augment_on:
            y = augment(y)
        x = torch.from_numpy(logmel_from_float32(y)).squeeze(0)  # [1,n_mels,T]
        ylab = torch.tensor([label], dtype=torch.float32)
        return x, ylab

from sklearn.model_selection import train_test_split
pos_tr, pos_va = train_test_split(pos_files, test_size=0.2, random_state=42, shuffle=True)
neg_tr, neg_va = train_test_split(neg_files, test_size=0.2, random_state=42, shuffle=True)

train_files = [(f,1) for f in pos_tr] + [(f,0) for f in neg_tr]
val_files   = [(f,1) for f in pos_va] + [(f,0) for f in neg_va]
import random as pyrand
pyrand.shuffle(train_files); pyrand.shuffle(val_files)

train_ds = KWSDataset2(train_files, augment_on=True)
val_ds   = KWSDataset2(val_files,   augment_on=False)

pos_w = 0.5 / max(1, sum(1 for _,l in train_files if l==1))
neg_w = 0.5 / max(1, sum(1 for _,l in train_files if l==0))
weights = [pos_w if l==1 else neg_w for _,l in train_files]
sampler = WeightedRandomSampler(weights, num_samples=len(weights), replacement=True)

BATCH_SIZE = min(64, max(8, len(train_ds)//8))  # 데이터 크기에 따라 자동 조정(안전)
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, sampler=sampler, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

say(f"Train: {len(train_ds)} | Val: {len(val_ds)} | Batch: {BATCH_SIZE}")


양성 97 | 음성 54
Train: 120 | Val: 31 | Batch: 15


In [None]:

# ================================================================
# 8) 모델
# ================================================================
import torch, torch.nn as nn

class ConvBNReLU(nn.Module):
    def __init__(self, in_c, out_c, k=3, s=1, p=1):
        super().__init__()
        self.conv = nn.Conv2d(in_c, out_c, k, s, p, bias=False)
        self.bn = nn.BatchNorm2d(out_c)
        self.act = nn.ReLU(inplace=True)
    def forward(self, x):
        return self.act(self.bn(self.conv(x)))

class KWSNet(nn.Module):
    def __init__(self, n_mels=N_MELS):
        super().__init__()
        self.feat = nn.Sequential(
            ConvBNReLU(1, 16, 3, 1, 1),
            nn.MaxPool2d((2,2)),
            ConvBNReLU(16, 32, 3, 1, 1),
            nn.MaxPool2d((2,2)),
            ConvBNReLU(32, 64, 3, 1, 1),
            nn.MaxPool2d((2,2)),
            ConvBNReLU(64, 96, 3, 1, 1),
            nn.AdaptiveAvgPool2d((1,1)),
        )
        self.head = nn.Linear(96, 1)
    def forward(self, x):
        z = self.feat(x)
        z = z.view(z.size(0), -1)
        return self.head(z)

import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = KWSNet().to(device)
print("Params:", sum(p.numel() for p in model.parameters()))


Params: 78993


In [None]:

# ================================================================
# 9) 학습
# ================================================================
import torch
import torch.nn as nn
from torch.optim import AdamW
from torchmetrics.classification import BinaryAUROC, BinaryF1Score, BinaryRecall, BinaryPrecision

auroc_metric = BinaryAUROC().to(device)
f1_metric = BinaryF1Score().to(device)
rec_metric = BinaryRecall().to(device)
prec_metric = BinaryPrecision().to(device)

optimizer = AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.BCEWithLogitsLoss()

best_val = -1.0
best_state = None
no_improve = 0
PATIENCE = 6
EPOCHS = 25

def run_epoch(loader, train=True):
    model.train(train)
    total_loss = 0.0
    y_true, y_prob = [], []
    for x, y in loader:
        x = x.to(device); y = y.to(device)
        if train:
            optimizer.zero_grad(set_to_none=True)
        logit = model(x)
        loss = criterion(logit, y)
        if train:
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)
            optimizer.step()
        with torch.no_grad():
            prob = torch.sigmoid(logit)
            total_loss += loss.item() * x.size(0)
            y_true.append(y.detach().cpu())
            y_prob.append(prob.detach().cpu())
    import torch as T
    y_true = T.cat(y_true).view(-1)
    y_prob = T.cat(y_prob).view(-1)
    auroc = auroc_metric(y_prob, y_true.int()).item()
    y_pred = (y_prob >= 0.5).int()
    f1 = f1_metric(y_pred, y_true.int()).item()
    rec = rec_metric(y_pred, y_true.int()).item()
    prc = prec_metric(y_pred, y_true.int()).item()
    loss_avg = total_loss / max(1, len(loader.dataset))
    return loss_avg, auroc, f1, rec, prc, y_true.numpy(), y_prob.numpy()

for epoch in range(1, EPOCHS+1):
    tr = run_epoch(train_loader, train=True)
    va = run_epoch(val_loader, train=False)
    print(f"[{epoch:02d}] "
          f"train loss {tr[0]:.4f} auroc {tr[1]:.3f} f1 {tr[2]:.3f} rec {tr[3]:.3f} prc {tr[4]:.3f} | "
          f"val loss {va[0]:.4f} auroc {va[1]:.3f} f1 {va[2]:.3f} rec {va[3]:.3f} prc {va[4]:.3f}")
    score = va[1]
    if score > best_val:
        best_val = score
        best_state = {k:v.cpu().clone() for k,v in model.state_dict().items()}
        no_improve = 0
    else:
        no_improve += 1
        if no_improve >= PATIENCE:
            print("조기 종료")
            break

if best_state is not None:
    model.load_state_dict(best_state)
model.eval()


[01] train loss 0.7092 auroc 0.633 f1 0.708 rec 0.950 prc 0.564 | val loss 0.6674 auroc 0.255 f1 0.784 rec 1.000 prc 0.645
[02] train loss 0.5678 auroc 0.858 f1 0.753 rec 0.917 prc 0.640 | val loss 0.5982 auroc 0.782 f1 0.851 rec 1.000 prc 0.741
[03] train loss 0.4959 auroc 0.854 f1 0.803 rec 0.791 prc 0.815 | val loss 0.5289 auroc 0.832 f1 0.870 rec 1.000 prc 0.769
[04] train loss 0.4036 auroc 0.930 f1 0.841 rec 0.882 prc 0.804 | val loss 0.8371 auroc 0.832 f1 0.784 rec 1.000 prc 0.645
[05] train loss 0.3890 auroc 0.951 f1 0.864 rec 0.836 prc 0.895 | val loss 0.4220 auroc 0.909 f1 0.870 rec 1.000 prc 0.769
[06] train loss 0.4407 auroc 0.890 f1 0.863 rec 0.845 prc 0.882 | val loss 0.3904 auroc 0.945 f1 0.870 rec 1.000 prc 0.769
[07] train loss 0.3678 auroc 0.944 f1 0.879 rec 0.855 prc 0.904 | val loss 0.3547 auroc 0.982 f1 0.952 rec 1.000 prc 0.909
[08] train loss 0.3003 auroc 0.974 f1 0.921 rec 0.889 prc 0.955 | val loss 0.3161 auroc 0.973 f1 0.976 rec 1.000 prc 0.952
[09] train loss 

KWSNet(
  (feat): Sequential(
    (0): ConvBNReLU(
      (conv): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (act): ReLU(inplace=True)
    )
    (1): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (2): ConvBNReLU(
      (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (act): ReLU(inplace=True)
    )
    (3): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (4): ConvBNReLU(
      (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (act): ReLU(inplace=True)
    )
    (5): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), 

In [None]:

# ================================================================
# 10) 임계값 선택 & ONNX 내보내기
# ================================================================
import numpy as np, torch, onnxruntime as ort

with torch.no_grad():
    ys, ps = [], []
    for x, y in val_loader:
        x = x.to(device)
        p = torch.sigmoid(model(x)).cpu().numpy().ravel()
        ys.append(y.numpy().ravel()); ps.append(p)
    ys = np.concatenate(ys); ps = np.concatenate(ps)

cands = np.linspace(0.50, 0.995, 200)
best_f1 = -1; best_th = 0.9; best_tuple = None
for th in cands:
    pred = (ps >= th).astype(np.int32)
    tp = np.sum((pred==1)&(ys==1))
    fp = np.sum((pred==1)&(ys==0))
    tn = np.sum((pred==0)&(ys==0))
    fn = np.sum((pred==0)&(ys==1))
    prec = tp / max(1, (tp+fp))
    rec  = tp / max(1, (tp+fn))
    f1 = 0.0 if (prec+rec)==0 else (2*prec*rec)/(prec+rec)
    fpr = fp / max(1, (fp+tn))
    if fpr <= 0.015 and f1 > best_f1:
        best_f1 = f1; best_th = float(th); best_tuple = (prec, rec, f1, fpr)
print("선택된 KWS 임계값:", best_th, "| val [prec,rec,f1,fpr]=", best_tuple)

# ONNX export
frames = 1 + int((SR*CLIP_SEC)//HOP)
dummy = torch.randn(1,1,N_MELS,frames, device=device)
onnx_path = "/content/artifacts/kws_model.onnx"
import torch
torch.onnx.export(model, dummy, onnx_path,
                  input_names=["input"], output_names=["output"],
                  opset_version=13, do_constant_folding=True)
print("ONNX saved:", onnx_path)

sess = ort.InferenceSession(onnx_path, providers=['CPUExecutionProvider'])
out = sess.run(["output"], {"input": np.random.randn(1,1,N_MELS,frames).astype(np.float32)})[0]
print("ONNX output shape:", out.shape)


선택된 KWS 임계값: 0.6019849246231156 | val [prec,rec,f1,fpr]= (1.0, 1.0, 1.0, 0.0)
ONNX saved: /content/artifacts/kws_model.onnx
ONNX output shape: (1, 1)


In [None]:

# ================================================================
# 11) 화자 임베딩 및 spk_thresh
# ================================================================
import numpy as np, torch, glob, random
from speechbrain.pretrained import EncoderClassifier

ecapa = EncoderClassifier.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    savedir="/content/artifacts/ecapa_cache"
)

def embed_file(path: str):
    wav, sr = librosa.load(path, sr=SR, mono=True)
    wav_t = torch.from_numpy(wav).float().unsqueeze(0)
    with torch.no_grad():
        e = ecapa.encode_batch(wav_t).squeeze(0).squeeze(0).cpu().numpy().astype(np.float32)
    return e / (np.linalg.norm(e) + 1e-9)

pos_files = sorted(glob.glob(str(USER_POS/"*.wav")))
neg_spk_files = sorted(glob.glob(str(OTHER_SPK/"*.wav"))) + sorted(glob.glob(str(TTS_NEG/"**/*.wav"), recursive=True))

user_vecs = []
for f in pos_files[:max(1,len(pos_files))]:
    try: user_vecs.append(embed_file(f))
    except: pass
if len(user_vecs)==0:
    user_vecs.append(np.random.randn(192).astype(np.float32))
user_embed = np.mean(user_vecs, axis=0).astype(np.float32)
user_embed = user_embed / (np.linalg.norm(user_embed)+1e-9)

def cos(a,b): return float(np.dot(a,b)/(np.linalg.norm(a)+1e-9)/(np.linalg.norm(b)+1e-9))

pos_sims = []
sample_pos = pos_files if len(pos_files)<=50 else random.sample(pos_files, 50)
for f in sample_pos:
    try: pos_sims.append(cos(embed_file(f), user_embed))
    except: pass

neg_vecs = []
for f in neg_spk_files[:200]:
    try: neg_vecs.append(embed_file(f))
    except: pass
neg_sims = [cos(v, user_embed) for v in neg_vecs] if neg_vecs else [0.0]

pos_mu = float(np.mean(pos_sims)) if pos_sims else 0.7
neg_mu = float(np.mean(neg_sims)) if neg_sims else 0.3
spk_thresh = max(0.30, min(0.95, (pos_mu + neg_mu)/2.0 - 0.02))
print(f"spk_thresh={spk_thresh:.3f} | pos_mu={pos_mu:.3f} neg_mu={neg_mu:.3f}")

np.save("/content/artifacts/user_embed.npy", user_embed)
print("Saved: /content/artifacts/user_embed.npy")


hyperparams.yaml: 0.00B [00:00, ?B/s]

embedding_model.ckpt:   0%|          | 0.00/83.3M [00:00<?, ?B/s]

mean_var_norm_emb.ckpt:   0%|          | 0.00/1.92k [00:00<?, ?B/s]

classifier.ckpt:   0%|          | 0.00/5.53M [00:00<?, ?B/s]

label_encoder.txt: 0.00B [00:00, ?B/s]

spk_thresh=0.410 | pos_mu=0.797 neg_mu=0.062
Saved: /content/artifacts/user_embed.npy


In [None]:

# ================================================================
# 12) thresholds.json + 다운로드
# ================================================================
import json, os, shutil
thr = {
    "kws_thresh": float(best_th),
    "clip_sec": float(CLIP_SEC),
    "n_mels": int(N_MELS),
    "fmin": float(FMIN),
    "fmax": float(FMAX),
    "spk_thresh": float(spk_thresh),
    "note": "auto-generated v1.1"
}
with open("/content/artifacts/thresholds.json","w",encoding="utf-8") as f:
    json.dump(thr, f, ensure_ascii=False, indent=2)
print("Saved: /content/artifacts/thresholds.json")

# 번들
import shutil
zip_path = "/content/artifacts/models_bundle.zip"
shutil.make_archive(zip_path.replace(".zip",""), "zip", "/content/artifacts")
print("Bundled:", zip_path)

try:
    from google.colab import files
    files.download("/content/artifacts/kws_model.onnx")
    files.download("/content/artifacts/thresholds.json")
    files.download("/content/artifacts/user_embed.npy")
    files.download(zip_path)
except Exception as e:
    print("자동 다운로드 스킵:", e)

print("배치 경로 권장: /home/okj1812/models/")


Saved: /content/artifacts/thresholds.json
Bundled: /content/artifacts/models_bundle.zip


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

배치 경로 권장: /home/okj1812/models/
