## デシベルと、時間長の閾値を用いて、pauseを抽出する。

## 準備
### ライブラリのインポート

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pathlib import Path
import librosa
import librosa.display
from scipy import ndimage
from IPython.display import Audio

### データの読み込み

In [None]:
DATA_DIR = Path("/Users/takeshitashunji/Programming/Python/PausePrediction/data")
assert DATA_DIR.exists()

In [None]:
jvs_path = Path("/Users/takeshitashunji/Downloads/jvs_ver1 2")

# jvs001~jvs100
jvs_list = [f"{i:03}" for i in range(1, 101)]

# wavファイルのパスをデータフレームを作成

columns = ["jvs", "speach_type", "wav_path", "lab_path"]

df = []

for jvs in jvs_list:
    for speach_type in ["parallel100", "nonpara30"]:
        wav_files = Path(jvs_path / f"jvs{jvs}" / speach_type / "wav24kHz16bit").glob(
            "*.wav"
        )
        base_names = [wav_file.stem for wav_file in wav_files]
        for base_name in base_names:
            wav_path = (
                jvs_path
                / f"jvs{jvs}"
                / speach_type
                / "wav24kHz16bit"
                / f"{base_name}.wav"
            )
            lab_path = (
                jvs_path
                / f"jvs{jvs}"
                / speach_type
                / "lab"
                / "mon"
                / f"{base_name}.lab"
            )
            df.append([jvs, speach_type, wav_path, lab_path])

df_jvs = pd.DataFrame(df, columns=columns)

df_jvs

## 音声波形を抽出する

In [None]:
# 音声波形を抽出する。
def extract_audio_waveform(audio_file_path, sr=24000):
    waveform, sample_rate = librosa.load(audio_file_path, mono=True)
    return waveform, sample_rate


wav, sr = extract_audio_waveform(df_jvs.iloc[0]["wav_path"])
print(wav)
print(wav.shape)
librosa.display.waveshow(wav, sr=sr)

In [None]:
# 波形列を追加する
df_jvs["wave_sequence"] = df_jvs["wav_path"].apply(
    lambda x: extract_audio_waveform(x, 24000)[0]
)
df_jvs

## db列へ変換する

In [None]:
db = librosa.amplitude_to_db(wav)
plt.plot(db)

In [None]:
df_jvs["db_sequence"] = df_jvs["wave_sequence"].apply(librosa.amplitude_to_db)
df_jvs.head()

## dbと音声長の閾値から、pauseの位置を設定する

- 連続している部分の抽出
  - https://qiita.com/studio_haneya/items/bce843eacb345dfaa97d
  - https://qiita.com/isourou/items/a7c32d35a206ec785a6f # これいいね
  - https://detail.chiebukuro.yahoo.co.jp/qa/question_detail/q12260522332

In [None]:
import numpy as np


def run_length_encoding(arr, min_run_length=3):
    """
    Run-Length Encoding (RLE)を実行して連続している部分をTrueとしたブール配列を返す関数

    Parameters:
        arr (numpy.ndarray): 連続している部分を判定したい1次元のNumPy配列

    Returns:
        numpy.ndarray: 連続している部分がTrueとなったブール配列
    """
    diff = np.diff(arr)  # 隣接要素の差分を計算
    run_starts = np.where(diff != 0)[0] + 1  # 差分が0でないインデックスを取得し、連続する範囲の開始位置を得る
    run_lengths = np.diff(np.concatenate(([0], run_starts, [len(arr)])))  # 連続する範囲の長さを計算
    result = np.repeat(run_lengths >= min_run_length, run_lengths)  # 連続する範囲をTrueに変換
    return result


# サンプル配列
arr = np.array([0, 0, 0, 1, 2, 3, 4, 0, 0, 0, 1, 2])

# RLEを実行してブール配列を得る
rle_result = run_length_encoding(arr)

print(rle_result)

In [None]:
# 閾値の設定
db_threshold = -50
time_threshold = 50 / 1000  # 50ms
sample_rate = 24000


# 閾値を超えたらpauseとみなす
def detect_pause_position(db_sequence, db_threshold, time_threshold, sample_rate):
    """dbと音声長の閾値からpauseの位置を判定する。

    Args:
        db_sequence (np.array): 音声波形をdbに変換した配列
        db_threshold (float): 無音区間とするdbの閾値
        time_threshold (float): 無音区間が連続した時にpauseとみなす時間の閾値

    Returns:
        pause_positions (list): pauseの位置のリスト
    """
    under_db_threshold = db_sequence < db_threshold

    # 連続区間を抽出
    sample_threshold = int(time_threshold * sample_rate)
    is_continuous = run_length_encoding(under_db_threshold, sample_threshold)

    # pauseの位置を抽出
    pause_positions = under_db_threshold & is_continuous

    return pause_positions


db_sequence = df_jvs.iloc[0]["db_sequence"]
pause_position = detect_pause_position(
    db_sequence, db_threshold, time_threshold, sample_rate
)

In [None]:
# 波形の上にpauseの位置を可視化する

fig, ax = plt.subplots(figsize=(20, 5))
x = np.arange(len(db_sequence)) / sample_rate
ax.plot(x, db_sequence)

# dbの閾値を引く
ax.axhline(
    y=db_threshold,
    color="r",
    linestyle="-",
    linewidth=2,
    alpha=0.7,
    label="db_threshold",
)

# pauseの領域を塗りつぶす
plt.fill_between(x, -80, 0, where=pause_position, facecolor="b", alpha=0.5)

ax.legend()
plt.show()

In [None]:
wav, sr = extract_audio_waveform(df_jvs.iloc[0]["wav_path"])
print(wav)
print(wav.shape)
plt.figure(figsize=(20, 5))
librosa.display.waveshow(wav, sr=sr)
plt.show()

Audio(wav, rate=sr)

In [None]:
# 閾値の設定
db_threshold = -50
time_threshold = 50 / 1000  # 50ms
sample_rate = 24000

df_jvs["db_threshold"] = db_threshold
df_jvs["time_threshold"] = time_threshold
df_jvs["sr"] = sample_rate
df_jvs["pause_position"] = df_jvs["db_sequence"].apply(
    detect_pause_position, args=(db_threshold, time_threshold, sample_rate)
)

df_jvs

In [None]:
df_jvs.to_csv("jvs-pause-visualize.csv", index=False)
df_jvs.to_pickle(DATA_DIR / "jvs-pause-visualize.pkl")

## Juliusの音素アライメントを可視化する

In [None]:
# labファイルの取り出し
def read_lab(lab_path):
    """labファイルを読み込む"""
    # labファイルがない場合
    if not Path(lab_path).exists():
        print(f"{lab_path} does not exist.")
        return None

    # labファイルがある場合
    df_lab = []
    with open(lab_path, "r") as f:
        for phoneme_idx, line in enumerate(f):
            if line == "":
                continue
            start, end, phoneme = line.split()
            duration = float(end) - float(start)
            df_lab.append(
                {
                    "start": float(start),
                    "end": float(end),
                    "phoneme": phoneme,
                    "phoneme_idx": phoneme_idx,
                    "duration": duration,
                }
            )
    df_lab = pd.DataFrame(df_lab)
    return df_lab


read_lab(df_jvs.iloc[0]["lab_path"])

In [None]:
def plot_phoneme_alignment(lab_path):
    """Labファイルから音素のアライメントをプロットする

    Args:
        lab_path (_type_): Labファイルのパス
    """
    df = read_lab(lab_path)

    # 描画
    fig, ax = plt.subplots(figsize=(20, 2))
    for start, end, label, _, _ in df.values:
        ax.axvline(start, color="gray", linestyle="--")
        ax.text((start + end) / 2, 0.5, label, ha="center", va="bottom", fontsize=8)
    ax.set_yticks([])
    ax.set_xlim(0, df["end"].max())
    ax.set_xlabel("Time (seconds)")
    plt.legend()
    plt.show()


plot_phoneme_alignment(df_jvs.loc[0, "lab_path"])

## 並べて可視化する

In [None]:
fig, axes = plt.subplots(
    3, 1, figsize=(20, 10), gridspec_kw={"height_ratios": [4, 4, 2]}
)


# Plot 1
# Plot the original audio signal
wav, sr = extract_audio_waveform(df_jvs.iloc[0]["wav_path"])
axes[0].plot(np.arange(len(wav)) / sr, wav)
# librosa.display.waveshow(wav, sr=sr, ax=axes[0])
axes[0].set_title("Original audio signal")
# axes[0].set_xlabel('Time (seconds)')
axes[0].set_ylabel("Amplitude")
axes[0].set_xlim(0, len(wav) / sr)

# Plot 2
# Plot the audio db signal with the pause positions
db_sequence = df_jvs.iloc[0]["db_sequence"]
pause_position = detect_pause_position(
    db_sequence, db_threshold, time_threshold, sample_rate
)
x = np.arange(len(db_sequence)) / sample_rate
axes[1].plot(x, db_sequence)
# dbの閾値を引く
axes[1].axhline(
    y=db_threshold,
    color="r",
    linestyle="-",
    linewidth=2,
    alpha=0.7,
    label="db_threshold",
)
# pauseの領域を塗りつぶす
axes[1].fill_between(x, -80, 0, where=pause_position, facecolor="b", alpha=0.5)
axes[1].set_title("Audio db signal with the pause positions")
# axes[1].set_xlabel('Time (seconds)')
axes[1].set_ylabel("Amplitude (db)")
axes[1].set_xlim(0, len(db_sequence) / sample_rate)

# Plot 3
# Plot the phoneme alignment
df = read_lab(df_jvs.iloc[0]["lab_path"])
for start, end, label, _, _ in df.values:
    axes[2].axvline(start, color="gray", linestyle="--")
    axes[2].text((start + end) / 2, 0.5, label, ha="center", va="bottom", fontsize=8)
axes[2].set_yticks([])
axes[2].set_xlim(0, df["end"].max())
axes[2].set_xlabel("Time (seconds)")
axes[2].set_title("Phoneme alignment")
plt.show()

# Audio(wav, rate=sr)

In [None]:
def plot_audio_features(
    df_jvs,
    db_threshold=-50,
    time_threshold=50 / 1000,
    sample_rate=24000,
    idxloc=0,
    savefig=False,
):
    fig, axes = plt.subplots(
        3, 1, figsize=(20, 10), gridspec_kw={"height_ratios": [4, 4, 2]}
    )

    # Plot 1
    # Plot the original audio signal
    wav, sr = extract_audio_waveform(df_jvs.iloc[idxloc]["wav_path"])
    axes[0].plot(np.arange(len(wav)) / sr, wav)
    # librosa.display.waveshow(wav, sr=sr, ax=axes[0])
    axes[0].set_title("Original audio signal")
    # axes[0].set_xlabel('Time (seconds)')
    axes[0].set_ylabel("Amplitude")
    axes[0].set_xlim(0, len(wav) / sr)

    # Plot 2
    # Plot the audio db signal with the pause positions
    db_sequence = df_jvs.iloc[idxloc]["db_sequence"]
    pause_position = detect_pause_position(
        db_sequence, db_threshold, time_threshold, sample_rate
    )
    x = np.arange(len(db_sequence)) / sample_rate
    axes[1].plot(x, db_sequence)
    # dbの閾値を引く
    axes[1].axhline(
        y=db_threshold,
        color="r",
        linestyle="-",
        linewidth=2,
        alpha=0.7,
        label="db_threshold",
    )
    # pauseの領域を塗りつぶす
    axes[1].fill_between(x, -80, 0, where=pause_position, facecolor="b", alpha=0.5)
    axes[1].set_title("Audio db signal with the pause positions")
    # axes[1].set_xlabel('Time (seconds)')
    axes[1].set_ylabel("Amplitude (db)")
    # axes[1].set_xlim(0, len(db_sequence)/sample_rate)
    axes[1].set_xlim(0, len(wav) / sr)

    # Plot 3
    # Plot the phoneme alignment
    df = read_lab(df_jvs.iloc[idxloc]["lab_path"])
    for start, end, label, _, _ in df.values:
        axes[2].axvline(start, color="gray", linestyle="--")
        axes[2].text(
            (start + end) / 2, 0.5, label, ha="center", va="bottom", fontsize=8
        )
    axes[2].set_yticks([])
    # axes[2].set_xlim(0, df['end'].max())
    axes[2].set_xlim(0, len(wav) / sr)
    axes[2].set_xlabel("Time (seconds)")
    axes[2].set_title("Phoneme alignment")

    print(len(wav))
    print(len(db_sequence))
    print(len(pause_position))

    plt.tight_layout()
    if savefig:
        wav_path = Path(df_jvs.iloc[idxloc]["wav_path"])
        dir_path = wav_path.parent
        save_path = dir_path / f"{wav_path.stem}-visualize.png"
        plt.savefig(save_path)

    plt.show()

    # Audio(wav, rate=sr)

In [None]:
plot_audio_features(df_jvs, db_threshold=-70)

In [None]:
plot_audio_features(df_jvs, db_threshold=-50, time_threshold=0.03)

In [None]:
plot_audio_features(df_jvs, db_threshold=-50, time_threshold=0.05)

In [None]:
for idx in range(len(df_jvs)):
    plot_audio_features(df_jvs, idxloc=idx, savefig=True)

## ポーズ分析

閾値によって得られた、ポーズを以下の3つに分類する。
- sil
- PIP：句読点のポーズ
- RP：それ以外のポーズ

In [None]:
def classfy_pause(
    db_sequence, lab_path, db_threshold=-50, time_threshold=0.05, sample_rate=24000
):
    """ポーズを分類する

    Args:
        df_jvs (_type_): _description_
    """
    # db_threshold = -50
    # time_threshold = 0.05
    # sample_rate = 24000

    # db_sequence = df_jvs.iloc[0]['db_sequence']
    pause_position = detect_pause_position(
        db_sequence, db_threshold, time_threshold, sample_rate
    )

    def run_length_encoding_range(arr, min_run_length=3):
        """
        Run-Length Encoding (RLE)を実行して連続している部分をTrueとしたブール配列を返す関数

        Parameters:
            arr (numpy.ndarray): 連続している部分を判定したい1次元のNumPy配列
            min_run_length (int): 連続していると判定する最小の長さ（デフォルトは3）

        Returns:
            numpy.ndarray: 連続している部分がTrueとなったブール配列
            list: 連続している部分の始点と終点のリスト [(start1, end1), (start2, end2), ...]
        """
        diff = np.diff(arr)  # 隣接要素の差分を計算
        run_starts = np.where(diff != 0)[0] + 1  # 差分が0でないインデックスを取得し、連続する範囲の開始位置を得る

        starts = np.concatenate(([0], run_starts))
        ends = np.concatenate((run_starts, [len(arr)]))
        lengths = ends - starts
        ranges = list(zip(starts, ends, lengths))

        # min_run_length以下の範囲を削除, Trueが連続しているもののみを取り出す
        ranges = [r for r in ranges if (r[2] >= min_run_length and arr[r[0]])]

        return ranges

    sample_threshold = int(time_threshold * sample_rate)
    pause_ranges = run_length_encoding_range(pause_position, sample_threshold)

    # print(pause_ranges)

    # df_lab = read_lab(df_jvs.iloc[0]['lab_path'])
    df_lab = read_lab(lab_path)

    ans = []
    for pause_range in pause_ranges:
        # df_labのstartもしくは、endが、start, endの範囲内にあるかどうか
        pause_start = pause_range[0]
        pause_end = pause_range[1]
        phoneme_start = df_lab["start"].values * sample_rate
        phoneme_end = df_lab["end"].values * sample_rate
        is_start_include = (pause_start <= phoneme_start) & (phoneme_start <= pause_end)
        is_end_include = (pause_start <= phoneme_end) & (phoneme_end <= pause_end)

        include_phonemes = df_lab[is_start_include | is_end_include]["phoneme"].values
        print(include_phonemes)

        if "sil" in include_phonemes:
            pause_type = "sil"
        elif "pau" in include_phonemes:
            pause_type = "PIP"
        else:
            pause_type = "RP"

        ans.append([pause_range[0], pause_range[1], pause_range[2], pause_type])

    return ans


# 各pause日して、start, end, duration, 分類

# df = read_lab(df_jvs.iloc[0]['lab_path'])

ans = classfy_pause(df_jvs.iloc[0]["db_sequence"], df_jvs.iloc[0]["lab_path"])

ans