In [3]:
# https://dev.classmethod.jp/articles/trial-python-webrtcvad/#toc-py-webrtcvad

In [None]:
import collections
import contextlib
import sys
import wave
import pathlib

import webrtcvad


def read_audio(path):
    """MP3またはWAVファイルを読み込みます。

    パスを受け取り、(PCMオーディオデータ, サンプルレート)を返します。
    """
    import pydub

    audio = pydub.AudioSegment.from_file(path)
    
    # モノラルに変換
    if audio.channels > 1:
        audio = audio.set_channels(1)
    
    # サンプルレートを確認
    sample_rate = audio.frame_rate
    assert sample_rate in (8000, 16000, 32000, 48000), f"サポートされていないサンプルレート: {sample_rate}"
    
    # PCMデータを取得
    pcm_data = audio.raw_data
    
    return pcm_data, sample_rate

def read_wave(path):
    """WAVファイルを読み込みます。

    パスを受け取り、(PCMオーディオデータ, サンプルレート)を返します。
    """
    with contextlib.closing(wave.open(path, 'rb')) as wf:
        num_channels = wf.getnchannels()
        assert num_channels == 1
        sample_width = wf.getsampwidth()
        assert sample_width == 2
        sample_rate = wf.getframerate()
        assert sample_rate in (8000, 16000, 32000, 48000)
        pcm_data = wf.readframes(wf.getnframes())
        return pcm_data, sample_rate

def write_wave(path, audio, sample_rate):
    """WAVファイルを書き込みます。

    パス、PCMオーディオデータ、サンプルレートを受け取ります。
    """
    with contextlib.closing(wave.open(path, 'wb')) as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(sample_rate)
        wf.writeframes(audio)


class Frame(object):
    """オーディオデータの「フレーム」を表現するクラス"""
    def __init__(self, bytes, timestamp, duration):
        self.bytes = bytes
        self.timestamp = timestamp
        self.duration = duration


def frame_generator(frame_duration_ms, audio, sample_rate):
    """PCMオーディオデータからオーディオフレームを生成します。

    フレーム長（ミリ秒）、PCMデータ、サンプルレートを受け取り、
    指定された長さのフレームを生成します。
    """
    n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
    offset = 0
    timestamp = 0.0
    duration = (float(n) / sample_rate) / 2.0
    while offset + n < len(audio):
        yield Frame(audio[offset:offset + n], timestamp, duration)
        timestamp += duration
        offset += n


def vad_collector(sample_rate: int, frame_duration_ms: int,
    padding_duration_ms: int, vad: webrtcvad.Vad, frames: list[Frame],
    voice_trigger_on_thres: float=0.9, voice_trigger_off_thres: float=0.1) -> list[dict]:
    """音声非音声セグメント処理

    Args:
        sample_rate (int): 単位時間あたりのサンプル数[Hz]
        frame_duration_ms (int): フレーム長
        padding_duration_ms (int): ガード長
        vad (webrtcvad.Vad): VADオブジェクト
        frames (list[Frame]): フレーム分割された音声データ
        voice_trigger_on_thres (float, optional): 音声セグメント開始と判断する閾値。デフォルト値は0.9
        voice_trigger_off_thres (float, optional): 音声セグメント終了と判断する閾値。デフォルト値は0.1

    Returns:
        list[dict]: セグメント結果
    """
    # ガードするフレーム数
    num_padding_frames = int(padding_duration_ms / frame_duration_ms)

    # フレームバッファ
    frame_buffer = []

    # 音声検出トリガーの状態
    triggered = False

    voiced_frames = []
    vu_segments = []

    for frame in frames:
        is_speech = vad.is_speech(frame.bytes, sample_rate)
        frame_buffer.append((frame, is_speech))

        # 非音声セグメントの処理
        if not triggered:
            # 過去フレームの音声判定数を計算
            num_voiced = len([f for f, speech in frame_buffer[-num_padding_frames:] if speech])

            # 音声セグメント開始の判定
            if num_voiced > voice_trigger_on_thres * num_padding_frames:
                triggered = True

                # 非音声セグメントの保存
                audio_data = b''.join([f.bytes for f, _ in frame_buffer[:-num_padding_frames]])
                vu_segments.append({"vad": 0, "audio_size": len(audio_data), "audio_data": audio_data})

                # 音声フレームの保持
                for f, _ in frame_buffer[-num_padding_frames:]:
                    voiced_frames.append(f)
                frame_buffer = []

        # 音声セグメントの処理
        else:
            voiced_frames.append(frame)

            # 過去フレームの非音声判定数を計算
            num_unvoiced = len([f for f, speech in frame_buffer[-num_padding_frames:] if not speech])

            # 音声セグメント終了の判定
            if num_unvoiced > (1 - voice_trigger_off_thres) * num_padding_frames:
                triggered = False

                # 音声セグメントの保存
                audio_data = b''.join([f.bytes for f in voiced_frames])
                vu_segments.append({"vad": 1, "audio_size": len(audio_data), "audio_data": audio_data})
                voiced_frames = []

                frame_buffer = []

    # 最終セグメントの処理
    if triggered:
        audio_data = b''.join([f.bytes for f in voiced_frames])
        vu_segments.append({"vad": 1, "audio_size": len(audio_data), "audio_data": audio_data})
    else:
        audio_data = b''.join([f.bytes for f, _ in frame_buffer])
        vu_segments.append({"vad": 0, "audio_size": len(audio_data), "audio_data": audio_data})

    return vu_segments

In [2]:
def split_segments(folder_path, file_name):
    # wav読込
    audio_data, sample_rate = read_audio(folder_path + file_name)

    # VADクラス
    vad = webrtcvad.Vad(0)

    # フレーム分割
    frames = frame_generator(30, audio_data, sample_rate)
    frames = list(frames)

    # セグメント結果
    # vu_segments = vad_collector(sample_rate, 30, 300, vad, frames)
    vu_segments = vad_collector(sample_rate, 30, 300, vad, frames, voice_trigger_off_thres=0.8)


    # 初期化
    vu_segments_merged = []
    count = 0
    while count < len(vu_segments):
        s = vu_segments[count].copy()

        # 5秒以下なら次のセグメントとマージ
        while s["audio_size"] < 5 * 2 * sample_rate:
            # 次のセグメントがない場合は強制終了
            if count == len(vu_segments) - 1:
                break

            # マージ処理
            s["audio_size"] = s["audio_size"] + vu_segments[count+1]["audio_size"]
            s["audio_data"] = s["audio_data"] + vu_segments[count+1]["audio_data"]
            count += 1

        # マージされたセグメントを格納
        vu_segments_merged.append(s)
        count += 1

    # wavファイル格納先作成
    segments_dir = pathlib.Path("./segments_merged/")
    segments_dir.mkdir(parents=True, exist_ok=True)

    # レコード作成しつつwavファイル出力
    for_df = []
    for i, segment in enumerate(vu_segments_merged):
        path = segments_dir.joinpath(f"{file_name.split('.')[0]}_segment-{i:03d}.wav")
        write_wave(str(path), segment['audio_data'], sample_rate)

In [None]:
import os
for file in os.listdir("./mp3/"):
    try:
        split_segments("./mp3/", file)
    except:
        print(file)

In [None]:
len(os.listdir("./segments_merged/"))