# Speech Commands (BROWSER_FFT互換) カスタム学習 → TFJS(model.json + metadata.json)

このノートは **@tensorflow-models/speech-commands の `BROWSER_FFT`** と整合するように、
ブラウザ側の FFT スペクトログラム（WebAudio + AnalyserNode相当）を **Python側でも再現**して学習し、
最後に **TFJS 形式の `model.json` + `metadata.json`** を出力します。

対象ラベル（9クラス）:
- up, down, left, right, go, stop, asial, unknown, background_noise

> 重要: ここでは **44.1kHz / fftSize=1024 / 約1秒(43フレーム)** を前提にします。

In [None]:
#@title 0) インストール（Colab向け：依存衝突を避ける版）
!pip -q install -U pip

# 学習に必要なもの
!pip -q install \
  "tensorflow==2.19.0" \
  "tensorflow-decision-forests==1.12.0" \
  "librosa==0.10.1" "soundfile==0.12.1" \
  "tqdm>=4.67" "scikit-learn>=1.6"

# TFJS 変換ツール（依存解決はしない）
!pip -q install "tensorflowjs==4.22.0" --no-deps

import sys, tensorflow as tf
print("Python:", sys.version)
print("TensorFlow:", tf.__version__)

In [None]:
#@title 1) 設定（BROWSER_FFT互換パラメータ）
import random
from pathlib import Path

SEED = 42
random.seed(SEED)

COMMAND_WORDS = ["up","down","left","right","go","stop"]
CUSTOM_WORD = "asial"
LABELS = COMMAND_WORDS + [CUSTOM_WORD, "unknown", "background_noise"]

# ---- BROWSER_FFT前提 ----
SR = 44100
FFT_SIZE = 1024
FRAME_LEN = FFT_SIZE
NUM_FRAMES = 43                 # floor(44100 / 1024) = 43
CLIP_SAMPLES = NUM_FRAMES * FRAME_LEN  # 44032

FULL_BINS = FFT_SIZE // 2       # 512（Nyquist除外相当）
BINS = 232                      # 512で作ってもOK（重くなる）
assert BINS <= FULL_BINS

WORK = Path("/content/kws")
RAW  = WORK / "raw"
STD  = WORK / "std"
NOISE_POOL = WORK / "noise_pool"
for p in [RAW, STD, NOISE_POOL]:
    p.mkdir(parents=True, exist_ok=True)

print("LABELS:", LABELS)
print("SR:", SR, "FFT_SIZE:", FFT_SIZE, "NUM_FRAMES:", NUM_FRAMES, "BINS:", BINS)
print("WORK:", WORK)

## 2) Speech Commands を取得して raw に配置
- COMMAND_WORDS: up/down/left/right/go/stop
- unknown: それ以外の単語からランダム抽出
- background_noise: `_background_noise_` を 1秒にスライス（PCM16で保存）

In [None]:
#@title 2-1) Speech Commands v0.02 ダウンロード＆展開
import tarfile, urllib.request

speech_tar = WORK / "speech_commands_v0.02.tar.gz"
speech_root = WORK / "speech_commands_v0.02"

if not speech_root.exists():
    if not speech_tar.exists():
        url = "https://storage.googleapis.com/download.tensorflow.org/data/speech_commands_v0.02.tar.gz"
        print("Downloading:", url)
        urllib.request.urlretrieve(url, speech_tar)
    print("Extracting...")
    speech_root.mkdir(parents=True, exist_ok=True)
    with tarfile.open(speech_tar, "r:gz") as tar:
        tar.extractall(path=speech_root)

print("speech_root:", speech_root)

In [None]:
#@title 2-2) 指示コマンドを RAW にコピー
import shutil

def copy_all(src_dir: Path, dst_dir: Path):
    dst_dir.mkdir(parents=True, exist_ok=True)
    for f in src_dir.glob("*.wav"):
        shutil.copy2(f, dst_dir / f.name)
    return len(list(dst_dir.glob("*.wav")))

for w in COMMAND_WORDS:
    src = speech_root / w
    if not src.exists():
        print("WARN: missing word dir:", src)
        continue
    n = copy_all(src, RAW / w)
    print(w, n)

In [None]:
#@title 2-3) unknown を作る（Speech Commandsの他単語から抽出）
import random, shutil

target_unknown = sum(len(list((RAW / w).glob("*.wav"))) for w in COMMAND_WORDS)
exclude = set(COMMAND_WORDS + [CUSTOM_WORD, "_background_noise_", "unknown", "background_noise"])

candidate_dirs = [p for p in speech_root.iterdir()
                  if p.is_dir() and p.name not in exclude and not p.name.startswith(".")]

cand_files = []
for d in candidate_dirs:
    cand_files.extend(list(d.glob("*.wav")))

random.shuffle(cand_files)
cand_files = cand_files[:target_unknown]

dst = RAW / "unknown"
dst.mkdir(parents=True, exist_ok=True)
for f in cand_files:
    shutil.copy2(f, dst / f"{f.parent.name}_{f.name}")

print("unknown:", len(list(dst.glob("*.wav"))), "target:", target_unknown)

In [None]:
#@title 2-4) background_noise 1秒片を生成（PCM16）
import numpy as np
import librosa
import soundfile as sf
from tqdm import tqdm
import shutil

bg_src = speech_root / "_background_noise_"
bg_dst = RAW / "background_noise"
bg_dst.mkdir(parents=True, exist_ok=True)

def slice_noise_file(wav_path: Path, out_dir: Path, n_clips: int, prefix: str):
    y, _ = librosa.load(str(wav_path), sr=SR, mono=True)
    if len(y) < CLIP_SAMPLES:
        return 0
    count = 0
    max_start = len(y) - CLIP_SAMPLES
    for i in range(n_clips):
        start = random.randint(0, max_start)
        clip = y[start:start+CLIP_SAMPLES].astype(np.float32)
        out = out_dir / f"{prefix}_{wav_path.stem}_{i:04d}.wav"
        sf.write(out, clip, SR, subtype="PCM_16")
        count += 1
    return count

CLIPS_PER_BG_FILE = 170  # 6本前後×170≈1000

total = 0
for f in tqdm(sorted(bg_src.glob("*.wav"))):
    total += slice_noise_file(f, bg_dst, CLIPS_PER_BG_FILE, prefix="bg")

# noise_pool にコピー（学習時ミックス用）
if NOISE_POOL.exists():
    shutil.rmtree(NOISE_POOL)
NOISE_POOL.mkdir(parents=True, exist_ok=True)
for f in bg_dst.glob("*.wav"):
    shutil.copy2(f, NOISE_POOL / f.name)

print("background_noise clips:", total)
print("noise_pool clips:", len(list(NOISE_POOL.glob("*.wav"))))

## 3) asial データを RAW/asial に入れる
- ZIPアップロード（wav入り）

In [None]:
#@title 3) asial をZIPアップロードで投入（任意）
from google.colab import files
import zipfile, shutil

UPLOAD_ASIAL_ZIP = True  #@param {type:"boolean"}

if UPLOAD_ASIAL_ZIP:
    uploaded = files.upload()
    zip_path = next(iter(uploaded.keys()))
    asial_dir = RAW / CUSTOM_WORD
    asial_dir.mkdir(parents=True, exist_ok=True)

    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(asial_dir)

    # wav集約（サブフォルダにあってもOK）
    wavs = list(asial_dir.rglob("*.wav"))
    for w in wavs:
        if w.parent != asial_dir:
            shutil.copy2(w, asial_dir / w.name)

    print("asial wav count:", len(list(asial_dir.glob("*.wav"))))
else:
    print("Skip upload.")

In [None]:
#@title 3-B) Google Drive からコピー（任意）
USE_DRIVE = True  #@param {type:"boolean"}
DRIVE_ASIAL_DIR = "/content/drive/MyDrive/asial_wavs"  #@param {type:"string"}

if USE_DRIVE:
    from google.colab import drive
    drive.mount("/content/drive")

    src = Path(DRIVE_ASIAL_DIR)
    assert src.exists(), f"not found: {src}"
    dst = RAW / CUSTOM_WORD
    dst.mkdir(parents=True, exist_ok=True)

    for f in src.glob("*.wav"):
        shutil.copy2(f, dst / f.name)
    print("asial wav count:", len(list(dst.glob('*.wav'))))
else:
    print("Skip drive.")

## 4) WAV標準化（44.1kHz / mono / 約1秒 / PCM16）

In [None]:
#@title 4) 全ラベルの wav を STD に標準化（PCM16）
import numpy as np
import librosa
import soundfile as sf
from tqdm import tqdm
import shutil
import random
MAX_STD_PER_LABEL = 1000  # 各クラスの最大件数（asial以外）


def pad_or_trim(x, n):
    if len(x) < n:
        return np.pad(x, (0, n - len(x)))
    return x[:n]

def standardize_wav(in_path: Path, out_path: Path):
    y, _ = librosa.load(str(in_path), sr=SR, mono=True)
    y = pad_or_trim(y.astype(np.float32), CLIP_SAMPLES)
    out_path.parent.mkdir(parents=True, exist_ok=True)
    sf.write(str(out_path), y, SR, subtype="PCM_16")

# STD を作り直す
if STD.exists():
    shutil.rmtree(STD)
STD.mkdir(parents=True, exist_ok=True)

for label in LABELS:
    src_dir = RAW / label
    if not src_dir.exists():
        print("WARN missing label dir:", src_dir)
        continue
    files = sorted(src_dir.glob("*.wav"))
    for f in tqdm(files, desc=f"std:{label}"):
        standardize_wav(f, STD / label / f.name)

print("=== STD counts ===")
for label in LABELS:
    print(label, len(list((STD/label).glob("*.wav"))))

## 5) BROWSER_FFT相当のスペクトログラム特徴量を作って学習

In [None]:
#@title 5-A) dBレンジ自動キャリブレーション（重要）
# 目的: 特徴量が0..1で「上側に張り付く（meanが0.8以上）」現象を避けるため、
#      STD内のWAVから dB 値の分布をサンプルして MIN_DB/MAX_DB を自動決定します。
import numpy as np, librosa, random
from pathlib import Path

SAMPLE_PER_LABEL = 30  # 各ラベルからサンプルする数（多いほど安定・遅い）
P_LOW  = 1.0           # 下側パーセンタイル
P_HIGH = 99.0          # 上側パーセンタイル

def compute_db_clip_range(std_root: Path):
    all_db = []
    window = np.hanning(FRAME_LEN).astype(np.float32)

    for label in LABELS:
        files = list((std_root/label).glob("*.wav"))
        if not files:
            continue
        random.shuffle(files)
        files = files[:min(SAMPLE_PER_LABEL, len(files))]
        for f in files:
            y, _ = librosa.load(str(f), sr=SR, mono=True)
            if len(y) < CLIP_SAMPLES:
                y = np.pad(y, (0, CLIP_SAMPLES-len(y)))
            y = y[:CLIP_SAMPLES]
            frames = y.reshape(NUM_FRAMES, FRAME_LEN) * window
            fft = np.fft.rfft(frames, n=FFT_SIZE)
            mag = np.abs(fft) / FFT_SIZE
            mag = mag[:, :FULL_BINS]  # 512
            db = 20*np.log10(mag + 1e-12)
            all_db.append(db.reshape(-1))
    all_db = np.concatenate(all_db, axis=0)
    lo = float(np.percentile(all_db, P_LOW))
    hi = float(np.percentile(all_db, P_HIGH))
    lo -= 5.0
    hi += 5.0
    return lo, hi

MIN_DB, MAX_DB = compute_db_clip_range(STD)
print("Auto MIN_DB:", MIN_DB, "MAX_DB:", MAX_DB)

In [None]:
#@title 5) tf.data Dataset（BROWSER_FFT特徴量）
import tensorflow as tf
import numpy as np

AUTOTUNE = tf.data.AUTOTUNE
label_to_id = {l:i for i,l in enumerate(LABELS)}
id_to_label = {i:l for l,i in label_to_id.items()}

noise_files = [str(p) for p in NOISE_POOL.rglob("*.wav")] if NOISE_POOL.exists() else []
print("noise_files:", len(noise_files))

MIX_PROB = 0.7  # ノイズ合成を毎回やると unknown に崩壊しやすいので確率で実施


def decode_wav(path):
    audio = tf.io.read_file(path)
    wav, sr = tf.audio.decode_wav(audio, desired_channels=1)
    wav = tf.squeeze(wav, axis=-1)
    n = tf.shape(wav)[0]
    wav = tf.cond(n < CLIP_SAMPLES,
                  lambda: tf.pad(wav, [[0, CLIP_SAMPLES - n]]),
                  lambda: wav[:CLIP_SAMPLES])
    return wav

def random_mix_noise(wav):
    if len(noise_files) == 0:
        return wav

    # ノイズ合成は確率で実施（毎回合成すると unknown に崩壊しやすい）
    if tf.random.uniform([], 0.0, 1.0) >= MIX_PROB:
        return wav

    nf = tf.random.uniform([], 0, len(noise_files), dtype=tf.int32)
    noise = decode_wav(tf.constant(noise_files)[nf])

    # SNR を少し高めに（合成しすぎを避ける）
    snr_db = tf.random.uniform([], 10.0, 30.0)
    wav_rms = tf.sqrt(tf.reduce_mean(tf.square(wav)) + 1e-9)
    noi_rms = tf.sqrt(tf.reduce_mean(tf.square(noise)) + 1e-9)

    snr = tf.pow(10.0, snr_db / 20.0)
    scale = wav_rms / (snr * noi_rms + 1e-9)
    mixed = tf.clip_by_value(wav + noise * scale, -1.0, 1.0)
    return mixed

def wav_to_browserfft_db(wav):
    frames = tf.reshape(wav, [NUM_FRAMES, FRAME_LEN])  # [43, 1024]

    # WebAudio側に寄せて窓関数を適用（リーケージ低減）
    window = tf.signal.hann_window(FRAME_LEN, periodic=True)
    frames = frames * window

    fft = tf.signal.rfft(frames)                       # [43, 513]
    mag = tf.abs(fft)

    # ---- 重要：スケーリングをWebAudio寄せ（このままだと上側に飽和しやすい）----
    mag = mag / tf.cast(FFT_SIZE, tf.float32)

    # Nyquist除外相当で512へ
    mag = mag[:, :FULL_BINS]                           # [43, 512]

    # dB化
    db = 20.0 * (tf.math.log(mag + 1e-12) / tf.math.log(10.0))  # [43, 512]

    # 先頭BINSへトリム
    db = db[:, :BINS]                                  # [43, BINS]

    # クリップして 0..1 正規化
    db = tf.clip_by_value(db, MIN_DB, MAX_DB)
    db = (db - MIN_DB) / (MAX_DB - MIN_DB)

    return db[..., tf.newaxis]                         # [43, BINS, 1]

def make_file_label_lists():
    paths, labels = [], []
    for label in LABELS:
        d = STD / label
        if not d.exists():
            print("WARN missing:", d)
            continue
        for f in d.glob("*.wav"):
            paths.append(str(f))
            labels.append(label_to_id[label])
    idx = list(range(len(paths)))
    random.shuffle(idx)
    paths = [paths[i] for i in idx]
    labels = [labels[i] for i in idx]
    return paths, labels

def build_ds(paths, labels, training: bool):
    ds = tf.data.Dataset.from_tensor_slices((paths, labels))
    if training:
        ds = ds.shuffle(4096, seed=SEED, reshuffle_each_iteration=True)

    def _map(p, y):
        wav = decode_wav(p)
        if training:
            wav = random_mix_noise(wav)
        x = wav_to_browserfft_db(wav)
        return x, y

    ds = ds.map(_map, num_parallel_calls=AUTOTUNE)
    ds = ds.batch(64).prefetch(AUTOTUNE)
    return ds

paths, labels = make_file_label_lists()
print("total files:", len(paths))

# クラス別split（valに各クラス最低1）
from collections import defaultdict
by_class = defaultdict(list)
for p, y in zip(paths, labels):
    by_class[y].append(p)

train_paths, val_paths = [], []
train_labels, val_labels = [], []
val_ratio = 0.15

for y, plist in by_class.items():
    random.shuffle(plist)
    n_val = max(1, int(len(plist) * val_ratio))
    val = plist[:n_val]
    tr  = plist[n_val:]
    val_paths.extend(val);  val_labels.extend([y]*len(val))
    train_paths.extend(tr); train_labels.extend([y]*len(tr))

train_ds = build_ds(train_paths, train_labels, True)
val_ds   = build_ds(val_paths, val_labels, False)

# shape check
for x,y in train_ds.take(1):
    print("x:", x.shape, "y:", y.shape)

In [None]:
#@title 5-B) データ分布＆特徴量レンジ確認（任意）
from collections import Counter
import numpy as np

print("train label counts:", {LABELS[k]: v for k,v in Counter(train_labels).items()})
print("val   label counts:", {LABELS[k]: v for k,v in Counter(val_labels).items()})

for xb, yb in train_ds.take(1):
    x_np = xb.numpy()
    print("x batch stats: min", x_np.min(), "max", x_np.max(), "mean", x_np.mean(), "std", x_np.std())

## 6) モデル（軽量CNN）を学習

In [None]:
#@title 6) モデル学習
from tensorflow import keras
from tensorflow.keras import layers

for xb, yb in train_ds.take(1):
    input_shape = xb.shape[1:]
print("input_shape:", input_shape)

model = keras.Sequential([
    layers.Input(shape=input_shape),
    layers.Conv2D(16, (3,3), activation="relu", padding="same"),
    layers.MaxPool2D((2,2)),
    layers.Conv2D(32, (3,3), activation="relu", padding="same"),
    layers.MaxPool2D((2,2)),
    layers.Dropout(0.25),
    layers.Conv2D(64, (3,3), activation="relu", padding="same"),
    layers.GlobalAveragePooling2D(),
    layers.Dropout(0.25),
    layers.Dense(len(LABELS), activation="softmax")
])

model.compile(
    optimizer=keras.optimizers.Adam(1e-3),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

model.summary()

callbacks = [
    keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=15, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5),
]

# --- クラス重み（頻度の逆数）で unknown への崩壊を防ぐ ---
from collections import Counter
counts = Counter(train_labels)
total = sum(counts.values())
class_weight = {}
for i, name in enumerate(LABELS):
    c = counts.get(i, 1)
    class_weight[i] = total / (len(LABELS) * c)

# asial は極端に少ないので少しだけブースト（本筋はデータ増）
class_weight[label_to_id[CUSTOM_WORD]] *= 5.0

print("class_weight:", {LABELS[i]: round(w, 3) for i, w in class_weight.items()})

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=20,
    callbacks=callbacks,
    class_weight=class_weight
)

In [None]:
#@title 6-B) 予測の偏りチェック（任意）
from collections import Counter
import numpy as np

pred_hist = Counter()
true_hist = Counter()

for xb, yb in val_ds:
    yp = model.predict(xb, verbose=0)
    pred_hist.update(np.argmax(yp, axis=1))
    true_hist.update(np.argmax(yb.numpy(), axis=1))

print("val true hist:", {LABELS[k]: v for k,v in true_hist.items()})
print("val pred hist:", {LABELS[k]: v for k,v in pred_hist.items()})

## 7) 評価

In [None]:
#@title 7) 評価
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report

labels_idx = list(range(len(LABELS)))

y_true, y_pred = [], []
for xb, yb in val_ds:
    yp = model.predict(xb, verbose=0)
    y_true.extend(yb.numpy().tolist())
    y_pred.extend(np.argmax(yp, axis=1))

cm = confusion_matrix(y_true, y_pred, labels=labels_idx)
print("Confusion matrix (rows=true, cols=pred):")
print(cm)
print()

print(classification_report(
    y_true, y_pred,
    labels=labels_idx,
    target_names=LABELS,
    zero_division=0
))

## 8) TFJSへ変換（model.json + metadata.json）

In [None]:
#@title 8) TFJS 変換＆ metadata.json 生成
import tensorflowjs as tfjs
import json, shutil, zipfile
from pathlib import Path

export_dir = WORK / "tfjs_model"
if export_dir.exists():
    shutil.rmtree(export_dir)
export_dir.mkdir(parents=True, exist_ok=True)

tfjs.converters.save_keras_model(model, str(export_dir))
print("Saved TFJS model to:", export_dir)
print("Files:", [p.name for p in export_dir.iterdir()])

# speech-commands 用 metadata.json
metadata = {
    "wordLabels": LABELS
}
with open(export_dir / "metadata.json", "w", encoding="utf-8") as f:
    json.dump(metadata, f, ensure_ascii=False, indent=2)

# 参考：追加情報（任意）
extra = {
    "labels": LABELS,
    "sampleRateHz": SR,
    "fftSize": FFT_SIZE,
    "numFrames": NUM_FRAMES,
    "bins": BINS
}
with open(export_dir / "labels.json", "w", encoding="utf-8") as f:
    json.dump(extra, f, ensure_ascii=False, indent=2)

# zip 化
zip_path = WORK / "tfjs_model.zip"
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
    for p in export_dir.iterdir():
        z.write(p, arcname=p.name)

print("ZIP:", zip_path)

In [None]:
#@title 9) ZIP をダウンロード
from google.colab import files
files.download("/content/kws/tfjs_model.zip")

## 10) ブラウザ側の読み込み例（BROWSER_FFT）

```js
recognizer = speechCommands.create(
  "BROWSER_FFT",
  null,
  "/tfjs_model/model.json",
  "/tfjs_model/metadata.json"
);
await recognizer.ensureModelLoaded();
```