# ESN波形分類3（音声分類への応用）

## 初期設定

In [None]:
# @markdown 各種パラメータを変更可能です（デフォルトのまま実行してもOK）

# 音の特徴を何次元の数値で表現するか
N_MFCC = 13  # @param {type:"number"}

# 分析フレームを移動させる歩幅（小さいほど時間分解能が高い）default: 20ms (8000Hz の場合)
HOP_LEN = 160 # @param {type:"number"}

# 一度の分析に使う音声データの長さ（長いほど周波数分解能が高い）default: 50ms (8000Hz の場合)
WIN_LEN = 400 # @param {type:"number"}

# リザバーのニューロン数
N_RESERVOIR     = 400 # @param {type:"number"}

# スペクトル半径
SPECTRAL_RADIUS = 0.9 # @param {type:"number"}

# 内部結合の密度
DENSITY         = 0.05 # @param {type:"number"}

# リーク率
LEAK_RATE       = 0.3 # @param {type:"number"}

# 入力スケーリング
INPUT_SCALING   = 0.8 # @param {type:"number"}

# リッジ回帰の正則化係数
RIDGE_REG       = 1e-5 # @param {type:"number"}


print(f"MFCCの次元数: {N_MFCC }")
print(f"MFCCフレーム歩幅: {HOP_LEN}")
print(f"MFCCフレーム長: {WIN_LEN}")
print(f"リザバーのニューロン数: {N_RESERVOIR}")
print(f"スペクトル半径: {SPECTRAL_RADIUS}")
print(f"結合密度: {DENSITY}")
print(f"リーク率: {LEAK_RATE}")
print(f"入力スケーリング: {INPUT_SCALING}")
print(f"リッジ回帰の正則化係数: {RIDGE_REG}")

In [None]:
# @markdown チェックされているクラスを学習対象とします（デフォルトのまま実行してもOK）

# チェックボックスの定義
yes = True #@param {type:"boolean"}
no = True #@param {type:"boolean"}
stop = True #@param {type:"boolean"}
cat = False #@param {type:"boolean"}
dog = False #@param {type:"boolean"}
one = False #@param {type:"boolean"}
two = False #@param {type:"boolean"}
three = False #@param {type:"boolean"}

# 各クラスからランダムに取得する数
N_PER_CLASS = 200 # @param {type:"number"}

# 選択されたクラスをリスト化する処理
# クラス名をリストに定義
all_classes = ['yes', 'no', 'stop', 'cat', 'dog', 'one', 'two', 'three']

# locals()（現在の変数一覧）から、チェックが入っているものだけを抜き出す
selected_classes = [name for name in all_classes if locals().get(name)]


print(f"対象クラス: {selected_classes}")
print(f"1クラス当たりの取得データ数: {N_PER_CLASS}")

## データセットのダウンロード（初回実行のみ１～２分かかる）

In [None]:
# @markdown  データセットをダウンロードします。１～２分程度かかります。<br>
# @markdown  すでにダウンロードされている場合はスキップされますので「すべてのセルを実行」をクリックしても問題ありません。

# Google Speech Commands Dataset

%%bash
# 既にダウンロード済みならスキップ、未DLならDLして展開
if [ ! -f speech_commands_v0.02.tar.gz ]; then
  wget http://download.tensorflow.org/data/speech_commands_v0.02.tar.gz
  tar -xf speech_commands_v0.02.tar.gz
else
  echo "already downloaded, skip download and extract"
fi

## 指定したクラスからランダムにデータを選択

In [None]:
# @markdown  指定したクラスからランダムにデータを選びます

import random
import os
import random
import numpy as np
import librosa

np.random.seed(42)

SR = 8000
DURATION = 1.0
SAMPLES = int(SR * DURATION)

CLASSES = selected_classes

X_list = []
y_list = []

for label, cname in enumerate(CLASSES):
    # クラスフォルダ内の wav をすべて対象にする（話者制限なし）
    files = [
        f for f in os.listdir(cname)
        if f.endswith(".wav")
    ]

    if len(files) == 0:
        print(f"[WARN] クラス {cname} に wav がありません。スキップします。")
        continue

    # ランダムにシャッフル
    np.random.shuffle(files)

    # N_PER_CLASS に制限
    files = files[:N_PER_CLASS]

    print(f"クラス {cname}: {len(files)} 個（ランダム抽出）")

    for f in files:
        path = os.path.join(cname, f)
        wav, _ = librosa.load(path, sr=SR)

        if len(wav) > SAMPLES:
            wav = wav[:SAMPLES]
        elif len(wav) < SAMPLES:
            wav = np.pad(wav, (0, SAMPLES - len(wav)))

        X_list.append(wav.astype(np.float32))
        y_list.append(label)

X = np.stack(X_list)
y = np.array(y_list)

print("X shape:", X.shape)
print("y shape:", y.shape)
print("クラスごとの件数:", np.bincount(y, minlength=len(CLASSES)))

## 選択したデータの確認

In [None]:
# @markdown  選択したデータからランダムに１つ選択し音を確認できます（実行のたびに変わります）

# 学習データの中からランダムに波形を選び、グラフ描画と音声出力
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Audio

def show_random_sample(volume=0.5):
    """
    学習データ X, y からランダムに1つ取り出し、
    波形を描画して、音声を再生する。
    volume: 0.0〜1.0 で音量スケール
    """
    idx = np.random.randint(0, len(X))
    wav = X[idx]
    label_id = int(y[idx])
    label_name = CLASSES[label_id]

    print(f"index = {idx}, label = {label_name} (id={label_id})")

    # 時間軸（秒）
    t = np.arange(len(wav)) / SR

    # 波形プロット（全体）
    plt.figure(figsize=(10, 3))
    plt.plot(t, wav)
    plt.title(f"Waveform: {label_name}")
    plt.xlabel("Time [s]")
    plt.ylabel("Amplitude")
    plt.grid(True)
    plt.show()

    # 音量調整して再生
    return Audio(wav * volume, rate=SR, normalize=False)

# 実行するたびにランダムなサンプルが出る
audio_obj = show_random_sample(volume=0.5)
audio_obj

## 無音区間削除

In [None]:
# @markdown  選択した全てのデータについて無音区間を削除します

import numpy as np
import librosa

def trim_silence_rms(
    wav,
    sr,
    frame_length=400,   # 50ms @ 8kHz
    hop_length=160,     # 20ms
    rms_thresh=0.02     # 無音判定の閾値（重要）
):
    """
    RMSベースで無音区間を除去した波形を返す
    """
    # RMS計算（shape: (1, n_frames)）
    rms = librosa.feature.rms(
        y=wav,
        frame_length=frame_length,
        hop_length=hop_length
    )[0]

    # 有音フレームのインデックス
    keep = rms > rms_thresh

    if not np.any(keep):
        # 全部無音と判定された場合は元の波形を返す
        return wav

    # フレーム → サンプル範囲に変換
    idx = np.where(keep)[0]
    start = idx[0] * hop_length
    end   = min(len(wav), idx[-1] * hop_length + frame_length)

    return wav[start:end]


X_trim = []

for wav in X:
    wav_trim = trim_silence_rms(
        wav,
        sr=SR,
        frame_length=WIN_LEN,   # MFCCと合わせる
        hop_length=HOP_LEN,
        rms_thresh=0.02
    )
    X_trim.append(wav_trim.astype(np.float32))

# list → object array（長さが変わるため）
X_trim = np.array(X_trim, dtype=object)

In [None]:
# @markdown  選択したデータからランダムに１つ選択し、元波形と無音カット後の波形を確認できます（実行のたびに変わります）

# 波形確認２つ並べる

# 学習データの中からランダムに波形を選び、
# 元波形 X と 無音カット後 X_trim を上下に描画＆再生

import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Audio, display

def show_random_sample_with_trim(volume=0.5):
    """
    学習データ X, X_trim, y からランダムに1つ取り出し、
    元波形と無音カット後波形を上下に表示して音声を再生する。
    """
    idx = np.random.randint(0, len(X))

    wav_org  = X[idx]
    wav_trim = X_trim[idx]

    label_id = int(y[idx])
    label_name = CLASSES[label_id]

    print(f"index = {idx}, label = {label_name} (id={label_id})")
    print(f"original length = {len(wav_org)}, trimmed length = {len(wav_trim)}")

    # 時間軸
    t_org  = np.arange(len(wav_org))  / SR
    t_trim = np.arange(len(wav_trim)) / SR

    # ==== 波形プロット ====
    fig, axes = plt.subplots(2, 1, figsize=(10, 5), sharex=False)

    axes[0].plot(t_org, wav_org)
    axes[0].set_title("Original waveform (X)")
    axes[0].set_ylabel("Amplitude")
    axes[0].grid(True)

    axes[1].plot(t_trim, wav_trim)
    axes[1].set_title("Trimmed waveform (X_trim)")
    axes[1].set_xlabel("Time [s]")
    axes[1].set_ylabel("Amplitude")
    axes[1].grid(True)

    plt.tight_layout()
    plt.show()

    # ==== 音声再生 ====
    print("▶ Original")
    display(Audio(wav_org * volume, rate=SR, normalize=False))
    print("▶ Trimmed")
    display(Audio(wav_trim * volume, rate=SR, normalize=False))

# 実行するたびにランダムなサンプルが出る
audio_obj = show_random_sample_with_trim(volume=0.5)
audio_obj

## 正規化

In [None]:
# @markdown  選択した全てのデータについて正規化します

# =========================
# 正規化（無音カット後の波形 X_trim → X_norm）
# =========================

import numpy as np

X_norm = []

for wav in X_trim:
    max_abs = np.max(np.abs(wav)) + 1e-8
    X_norm.append(wav / max_abs)

# object配列にして長さ違いを許容
X_norm = np.array(X_norm, dtype=object)

print("正規化完了")

In [None]:
# @markdown  選択したデータからランダムに１つ選択し、元波形と正規化後の波形を確認できます（実行のたびに変わります）


# =========================
# 正規化前後の波形を「全体」で比較表示
# （何度も実行する想定）
# =========================

import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Audio, display

def show_random_norm_compare(volume=0.3):
    idx = np.random.randint(0, len(X_norm))

    wav_before = X_trim[idx]
    wav_after  = X_norm[idx]

    label_id = int(y[idx])
    label_name = CLASSES[label_id]

    print(f"index = {idx}, label = {label_name} (id={label_id})")
    print(f"length: before={len(wav_before)}, after={len(wav_after)}")

    t_before = np.arange(len(wav_before)) / SR
    t_after  = np.arange(len(wav_after)) / SR

    plt.figure(figsize=(10, 5))

    # --- 正規化前 ---
    plt.subplot(2, 1, 1)
    plt.plot(t_before, wav_before)
    plt.title("Before normalization (X_trim)")
    plt.ylabel("Amplitude")
    plt.grid(True)

    # --- 正規化後 ---
    plt.subplot(2, 1, 2)
    plt.plot(t_after, wav_after)
    plt.title("After normalization (X_norm)")
    plt.xlabel("Time [s]")
    plt.ylabel("Amplitude")
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    # ==== 音声再生 ====
    print("▶ Before")
    display(Audio(wav_before * volume, rate=SR, normalize=False))
    print("▶ After")
    display(Audio(wav_after  * volume, rate=SR, normalize=False))

# 実行（何度でもOK）
show_random_norm_compare(volume=0.5)

## MFCCで前処理

In [None]:
# @markdown  MFCCで前処理を実行します


# =========================
# 前処理：MFCC（X_norm → X_mfcc）
# =========================

import numpy as np
import librosa

X_mfcc = []

for i in range(len(X_norm)):
    wav = X_norm[i]

    # MFCC（形状: (n_mfcc, time_frames)）
    mfcc = librosa.feature.mfcc(
        y=wav,
        sr=SR,
        n_mfcc=N_MFCC,
        hop_length=HOP_LEN,
        n_fft=WIN_LEN
    )

    # 転置して (time_frames, n_mfcc) の形にする
    # ESN に入れるため
    mfcc = mfcc.T   # (T, D)

    # MFCC後の発話全体を最大値でスケーリング（値域をおおよそ [-1, 1] に）
    mfcc = mfcc / (np.max(np.abs(mfcc)) + 1e-8)

    X_mfcc.append(mfcc.astype(np.float32))

# Pythonリスト → オブジェクト配列（可変長 T）
X_mfcc = np.array(X_mfcc, dtype=object)

print("MFCC完了")

In [None]:
# @markdown 選択したデータからランダムに１つ選択し、MFCC後の波形を確認できます（実行のたびに変わります）

import numpy as np
import matplotlib.pyplot as plt
import math

# ランダムに1サンプル選択
idx = np.random.randint(len(X_mfcc))
mfcc = X_mfcc[idx]   # shape: (T, D)


# ラベルの確認
label_id = int(y[idx])
label_name = CLASSES[label_id]
print(f"index = {idx}, label = {label_name} (id={label_id})")


# サンプル数の確認
print(f"MFCC前の波形のサンプル数：{X_norm[idx].shape[0]}")
T = math.floor(X_norm[idx].shape[0] / HOP_LEN) + 1
print(f"MFCC後の波形のサンプル数：{T}")


T, D = mfcc.shape

plt.figure(figsize=(12, 6))

for d in range(D):
    plt.plot(mfcc[:, d], label=f"MFCC {d}")

plt.title(f"Random MFCC sample (index={idx})")
plt.xlabel("Time frame")
plt.ylabel("Normalized MFCC value")
plt.grid(True)
# 次元数が多い場合は凡例を消した方が見やすい
plt.legend(ncol=4, fontsize=8)

plt.show()

## Echo State Networkで学習と性能評価

In [None]:
# @markdown ESNでの学習と性能評価まで一気に実施します

# 学習

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix

SEED = 1234

# =========================
# 0. データ準備
# =========================
# X_mfcc: 各要素が (T, D) の ndarray （発話ごとのMFCC系列）
# y:      クラスID（0〜）

num_classes = len(np.unique(y))
n_inputs = X_mfcc[0].shape[1]   # 例: 13次元, 40次元など

indices = np.arange(len(X_mfcc))
idx_train, idx_test = train_test_split(
    indices,
    test_size=0.2,
    random_state=42,
    stratify=y
)

X_train = [X_mfcc[i] for i in idx_train]
X_test  = [X_mfcc[i] for i in idx_test]
y_train = y[idx_train]
y_test  = y[idx_test]

# ---- 情報表示 ----
num_train_utt = len(X_train)
num_test_utt  = len(X_test)
total_train_frames = sum(arr.shape[0] for arr in X_train)  # 全フレーム数 (train)
D = n_inputs

print(f"Train utterances: {num_train_utt}")
print(f"Test utterances: {num_test_utt}")
print("Train class counts:", np.bincount(y_train))
print("Test  class counts:", np.bincount(y_test))
print(f"Total training frames: {total_train_frames}")
print(f"Feature dimension: {D}")
print("")

# =========================
# 1. ESN（リザバー）定義（多次元入力＋density付き）
# =========================

class SimpleESN:
    def __init__(self, n_inputs=40, n_reservoir=300,
                 spectral_radius=0.9, density=0.05,
                 input_scaling=0.5,
                 leak_rate=0.3, ridge_reg=1e-6, seed=0):
        self.n_inputs        = n_inputs
        self.n_reservoir     = n_reservoir
        self.spectral_radius = spectral_radius
        self.density         = density
        self.input_scaling   = input_scaling
        self.leak_rate       = leak_rate
        self.ridge_reg       = ridge_reg
        self.seed            = seed
        self.num_classes     = None  # 後でセット

        rng = np.random.default_rng(seed)

        # 入力 → リザバー（float32）
        self.W_in = (rng.uniform(-1, 1, (n_reservoir, n_inputs))
                    * self.input_scaling).astype(np.float32)

        # バイアス（各リザバーノードに1つ）（float32）
        self.b_in = (rng.uniform(-1, 1, n_reservoir)
                    * self.input_scaling).astype(np.float32)


        # リザバー内部結合（スパース化＋スペクトル半径調整）
        W = rng.uniform(-1, 1, (n_reservoir, n_reservoir)).astype(np.float32)

        if 0.0 < self.density < 1.0:
            mask = rng.uniform(0, 1, (n_reservoir, n_reservoir)) < self.density
            W = W * mask  # density に応じて0を入れる

        # スペクトル半径を調整
        eigvals = np.linalg.eigvals(W.astype(np.float64))
        sr = np.max(np.abs(eigvals))
        if sr > 0:
            W = (W * (self.spectral_radius / sr)).astype(np.float32)

        self.W = W
        self.W_out = None

    def _forward_states(self, u_seq):
        """
        u_seq: (T, D) の MFCC（どんな型でもOK、ここで揃える）
        """
        u_seq = np.asarray(u_seq, dtype=np.float32)
        T, D = u_seq.shape
        assert D == self.n_inputs

        x = np.zeros(self.n_reservoir, dtype=np.float32)
        states = np.zeros((T, self.n_reservoir), dtype=np.float32)

        for t in range(T):
            pre_activation = (
                self.W @ x + self.W_in @ u_seq[t] + self.b_in
            ).astype(np.float32)

            x_new = np.tanh(pre_activation)
            x = (1 - self.leak_rate) * x + self.leak_rate * x_new
            states[t] = x

        return states

    def fit(self, X_seq, y_labels, num_classes):
        """
        X_seq: list of (T_i, D)  発話ごとのMFCC系列
        y_labels: (N,) 発話ラベル
        """
        self.num_classes = num_classes

        N = len(X_seq)

        # 全フレーム数を数える（train全体）
        total_frames = sum(np.asarray(X_seq[i]).shape[0] for i in range(N))

        # H: (total_frames, 1+n_reservoir), Y: (total_frames, num_classes)
        H = np.zeros((total_frames, 1 + self.n_reservoir), dtype=np.float32)
        Y = np.zeros((total_frames, num_classes), dtype=np.float32)

        row = 0
        for i in range(N):
            states = self._forward_states(X_seq[i])     # (T_i, n_reservoir)
            T_i = states.shape[0]

            # 各フレームの特徴量 [1, x(t)] を積む
            H[row:row+T_i, 0] = 1.0
            H[row:row+T_i, 1:] = states

            # 各フレームに「その発話のラベル」を付与
            Y[row:row+T_i, y_labels[i]] = 1.0

            row += T_i

        # リッジ回帰: W_out = (H^T H + λI)^(-1) H^T Y
        Ht = H.T
        I = np.eye(H.shape[1], dtype=np.float32)         #  (1+n_res)×(1+n_res)
        A = Ht @ H + self.ridge_reg * I
        B = Ht @ Y
        self.W_out = np.linalg.solve(A.astype(np.float64), B.astype(np.float64)).astype(np.float32)

    def predict_proba(self, X_seq):
        """
        X_seq: list of (T_i, D)
        戻り値: (N, num_classes)
        → 推論：フレームごとの出力を平均して集約
        """
        N = len(X_seq)
        Y_out = np.zeros((N, self.num_classes), dtype=np.float32)

        for i in range(N):
            states = self._forward_states(X_seq[i])     # (T_i, n_res)
            T_i = states.shape[0]
            H = np.zeros((T_i, 1 + self.n_reservoir), dtype=np.float32)
            H[:, 0] = 1.0
            H[:, 1:] = states

            # 各フレームの線形出力 → 平均
            Z = H @ self.W_out                          # (T_i, num_classes)
            Y_out[i] = Z.mean(axis=0)

        return Y_out

    def predict(self, X_seq):
        """
        クラスID (argmax) を返す ＝ Utterance-level 予測
        """
        Y_lin = self.predict_proba(X_seq)
        return np.argmax(Y_lin, axis=1)

# =========================
# 2. ESN インスタンス作成 & 設定表示
# =========================

esn = SimpleESN(
    n_inputs=n_inputs,
    n_reservoir=N_RESERVOIR,
    spectral_radius=SPECTRAL_RADIUS,
    density=DENSITY,
    input_scaling=INPUT_SCALING,
    leak_rate=LEAK_RATE,
    ridge_reg=RIDGE_REG,
    seed=SEED
)

print("=== ESN Settings ===")
print(f"n_inputs        = {esn.n_inputs}")
print(f"n_reservoir     = {esn.n_reservoir}")
print(f"spectral_radius = {esn.spectral_radius}")
print(f"density         = {esn.density}")
print(f"leak_rate       = {esn.leak_rate}")
print(f"input_scaling   = {esn.input_scaling}")
print(f"ridge_reg       = {esn.ridge_reg}")
print(f"seed            = {esn.seed}")
print(f"num_classes     = {num_classes}")
print("====================\n")

# =========================
# 3. ESN を学習
# =========================

esn.fit(X_train, y_train, num_classes)

# =========================
# 4. 評価（Utterance-level）
# =========================

y_train_pred = esn.predict(X_train)
y_test_pred  = esn.predict(X_test)

train_acc = accuracy_score(y_train, y_train_pred)
test_acc  = accuracy_score(y_test,  y_test_pred)
cm = confusion_matrix(y_test, y_test_pred)

print(f"Train accuracy: {train_acc}")
print(f"Test  accuracy: {test_acc}")
print("confusion matrix:")
print(cm)
print()
print("Class order:", CLASSES)

## 未知データでの推論

In [None]:
# @markdown 未知データを音声合成で作るにあたっての設定です。

# TTS生成したい言葉
TTS_TEXT = "stop" # @param {type:"string"}

# TTS生成で使う言語
TTS_LANGUAGE = 'en' # @param ["en", "ja"]

# TTS生成で使うアクセント（アメリカ、イギリス、インド）
TTS_ACCENT = 'com' # @param ["com", "co.uk", "co.in"]

In [None]:
# @markdown 未知データを音声合成で作ります。

try:
    from gtts import gTTS
except ImportError:
    !pip install gTTS
    from gtts import gTTS

tts = gTTS(text=TTS_TEXT, lang=TTS_LANGUAGE, tld=TTS_ACCENT)
tts.save("tts.wav")
print("saved: tts.wav")
display(Audio("tts.wav"))
print(TTS_TEXT)

In [None]:
# @markdown 作った音声データに対し、無音区間削除、正規化、MFCC、ESN推論まで一気に実施します

import numpy as np
import librosa

def mfcc_from_wavfile(path):
    # load
    wav, _ = librosa.load(path, sr=SR)

    # trim silence (same as training)
    wav_trim = trim_silence_rms(
        wav, sr=SR,
        frame_length=WIN_LEN,
        hop_length=HOP_LEN,
        rms_thresh=0.02
    )

    # normalize (same as training)
    wav_norm = wav_trim / (np.max(np.abs(wav_trim)) + 1e-8)

    # MFCC (same params as training)
    mfcc = librosa.feature.mfcc(
        y=wav_norm,
        sr=SR,
        n_mfcc=N_MFCC,
        hop_length=HOP_LEN,
        n_fft=WIN_LEN,
        center=True
    ).T.astype(np.float32)  # (T, D)

    # MFCC post-normalize (same as your MFCC code)
    mfcc = mfcc / (np.max(np.abs(mfcc)) + 1e-8)

    return mfcc

# --- 推論したいファイル ---
path = "tts.wav"   # 先に作ったTTS音声

x_one = mfcc_from_wavfile(path)        # (T, D)
Y_lin = esn.predict_proba([x_one])     # (1, num_classes)
pred  = int(np.argmax(Y_lin[0]))

print("Predicted:", CLASSES[pred], "( id =", pred, ")")
print("")
print("Scores (mean linear output):", Y_lin[0])
print("Class order:", CLASSES)