# SBV2学習データ準備用のGoogle Colabスクリプト
公式の下記の実装を使わせていただいております。

https://github.com/litagin02/Style-Bert-VITS2/blob/master/colab.ipynb

「ランタイム」→「ランタイムのタイプを変更」

から、「T4 GPU」に変更してから、「Shift+Enter」で上からセルを実行してください。

# Google Driveのマウント
認証が入るので、最初のセルに置くのをオススメします。

In [None]:
#Google Driveのフォルダをマウント（認証入る）
from google.colab import drive
drive.mount('/content/drive')

# 必要パッケージのインストール

In [None]:
# このセルを実行して環境構築してください。
# エラーダイアログ「WARNING: The following packages were previously imported in this runtime: [pydevd_plugins]」が出るが「キャンセル」を選択して続行してください。

import os

os.environ["PATH"] += ":/root/.cargo/bin"

!curl -LsSf https://astral.sh/uv/install.sh | sh
!git clone https://github.com/litagin02/Style-Bert-VITS2.git
%cd Style-Bert-VITS2/
!uv pip install --system -r requirements.txt

# 各種パスやモデル名の指定

In [None]:
model_name = "amitaro"

# Google Driveでのパスを特定する
import glob
import os
drive_path = os.path.dirname(glob.glob('/content/drive/MyDrive/**/colab_SBV2train_sample/SBV2-prepare.ipynb', recursive=True)[0])
print(drive_path)

# 学習に必要なファイルや途中経過が保存されるディレクトリ
dataset_root = f"{drive_path}/Data"

# 学習結果（音声合成に必要なファイルたち）が保存されるディレクトリ
assets_root = f"{drive_path}/model_assets"

import yaml
with open("configs/paths.yml", "w", encoding="utf-8") as f:
    yaml.dump({"dataset_root": dataset_root, "assets_root": assets_root}, f)

# 音声ファイルを学習可能な適切な長さに分割するスクリプト

本家のスクリプトでは、大きな動画データを複数処理する際に、無料版のColabではRAMオーバーをしてしまうため、処理時間を少し犠牲にする代わりに、無料のColabでも動作できるスクリプトを実装します

In [None]:

#無料版のRAM12.7GBしかないColabでも実行可能な「slice.py」を実装

py_slice = """
import argparse
import shutil
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import Any, Optional
import gc

import soundfile as sf
import torch
from tqdm import tqdm

from config import get_path_config
from style_bert_vits2.logging import logger
from style_bert_vits2.utils.stdout_wrapper import SAFE_STDOUT


def is_audio_file(file: Path) -> bool:
    supported_extensions = [".wav", ".flac", ".mp3", ".ogg", ".opus", ".m4a"]
    return file.suffix.lower() in supported_extensions


def get_stamps(
    vad_model: Any,
    utils: Any,
    audio_file: Path,
    min_silence_dur_ms: int = 700,
    min_sec: float = 2,
    max_sec: float = 12,
):
    (get_speech_timestamps, _, read_audio, *_) = utils
    sampling_rate = 16000  # 16kHzか8kHzのみ対応

    min_ms = int(min_sec * 1000)

    wav = read_audio(str(audio_file), sampling_rate=sampling_rate)
    speech_timestamps = get_speech_timestamps(
        wav,
        vad_model,
        sampling_rate=sampling_rate,
        min_silence_duration_ms=min_silence_dur_ms,
        min_speech_duration_ms=min_ms,
        max_speech_duration_s=max_sec,
    )

    return speech_timestamps


def split_wav(
    vad_model: Any,
    utils: Any,
    audio_file: Path,
    target_dir: Path,
    min_sec: float = 2,
    max_sec: float = 12,
    min_silence_dur_ms: int = 700,
    time_suffix: bool = False,
) -> tuple[float, int]:
    margin: int = 200  # ミリ秒単位で、音声の前後に余裕を持たせる
    speech_timestamps = get_stamps(
        vad_model=vad_model,
        utils=utils,
        audio_file=audio_file,
        min_silence_dur_ms=min_silence_dur_ms,
        min_sec=min_sec,
        max_sec=max_sec,
    )

    data, sr = sf.read(audio_file)

    total_ms = len(data) / sr * 1000

    file_name = audio_file.stem
    target_dir.mkdir(parents=True, exist_ok=True)

    total_time_ms: float = 0
    count = 0

    # タイムスタンプに従って分割し、ファイルに保存
    for i, ts in enumerate(speech_timestamps):
        start_ms = max(ts["start"] / 16 - margin, 0)
        end_ms = min(ts["end"] / 16 + margin, total_ms)

        start_sample = int(start_ms / 1000 * sr)
        end_sample = int(end_ms / 1000 * sr)
        segment = data[start_sample:end_sample]

        if time_suffix:
            file = f"{file_name}-{int(start_ms)}-{int(end_ms)}.wav"
        else:
            file = f"{file_name}-{i}.wav"
        sf.write(str(target_dir / file), segment, sr)
        total_time_ms += end_ms - start_ms
        count += 1

    # メモリを解放
    del data
    gc.collect()

    return total_time_ms / 1000, count


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--min_sec", "-m", type=float, default=2, help="Minimum seconds of a slice")
    parser.add_argument("--max_sec", "-M", type=float, default=12, help="Maximum seconds of a slice")
    parser.add_argument("--input_dir", "-i", type=str, default="inputs", help="Directory of input wav files")
    parser.add_argument("--model_name", type=str, required=True, help="Output will be in Data/{model_name}/raw/")
    parser.add_argument("--min_silence_dur_ms", "-s", type=int, default=700, help="Silence above this duration (ms)")
    parser.add_argument("--time_suffix", "-t", action="store_true", help="Add time suffix to output file names")
    parser.add_argument("--num_processes", type=int, default=3, help="Number of parallel processes. Default is 3.")
    args = parser.parse_args()

    path_config = get_path_config()
    dataset_root = path_config.dataset_root

    model_name = str(args.model_name)
    input_dir = Path(args.input_dir)
    output_dir = dataset_root / model_name / "raw"
    min_sec: float = args.min_sec
    max_sec: float = args.max_sec
    min_silence_dur_ms: int = args.min_silence_dur_ms
    time_suffix: bool = args.time_suffix
    num_processes: int = args.num_processes

    audio_files = [file for file in input_dir.rglob("*") if is_audio_file(file)]

    logger.info(f"Found {len(audio_files)} audio files.")
    if output_dir.exists():
        logger.warning(f"Output directory {output_dir} already exists, deleting...")
        shutil.rmtree(output_dir)

    # バッチサイズの設定
    batch_size = 1  # 一度に処理するファイル数
    from math import ceil
    num_batches = ceil(len(audio_files) / batch_size)

    # モデルをダウンロード
    _ = torch.hub.load(
        repo_or_dir="litagin02/silero-vad",
        model="silero_vad",
        onnx=True,
        trust_repo=True,
    )

    # 並列処理の準備
    def process_queue(
        q: Queue[Optional[Path]],
        result_queue: Queue[tuple[float, int]],
        error_queue: Queue[tuple[Path, Exception]],
    ):
        vad_model, utils = torch.hub.load(
            repo_or_dir="litagin02/silero-vad",
            model="silero_vad",
            onnx=True,
            trust_repo=True,
        )
        while True:
            file = q.get()
            if file is None:
                q.task_done()
                break
            try:
                rel_path = file.relative_to(input_dir)
                time_sec, count = split_wav(
                    vad_model=vad_model,
                    utils=utils,
                    audio_file=file,
                    target_dir=output_dir / rel_path.parent,
                    min_sec=min_sec,
                    max_sec=max_sec,
                    min_silence_dur_ms=min_silence_dur_ms,
                    time_suffix=time_suffix,
                )
                result_queue.put((time_sec, count))
            except Exception as e:
                logger.error(f"Error processing {file}: {e}")
                error_queue.put((file, e))
                result_queue.put((0, 0))
            finally:
                q.task_done()

    for batch_idx in range(num_batches):
        start_idx = batch_idx * batch_size
        end_idx = min((batch_idx + 1) * batch_size, len(audio_files))
        batch_files = audio_files[start_idx:end_idx]

        q: Queue[Optional[Path]] = Queue()
        result_queue: Queue[tuple[float, int]] = Queue()
        error_queue: Queue[tuple[Path, Exception]] = Queue()

        threads = [
            Thread(target=process_queue, args=(q, result_queue, error_queue))
            for _ in range(min(num_processes, len(batch_files)))
        ]

        for t in threads:
            t.start()

        pbar = tqdm(total=len(batch_files), file=SAFE_STDOUT)
        for file in batch_files:
            q.put(file)

        # 結果の集計
        total_sec = 0
        total_count = 0
        for _ in range(len(batch_files)):
            time, count = result_queue.get()
            total_sec += time
            total_count += count
            pbar.update(1)

        q.join()

        for _ in range(len(threads)):
            q.put(None)

        for t in threads:
            t.join()

        pbar.close()

    if not error_queue.empty():
        error_str = "Error slicing some files:"
        while not error_queue.empty():
            file, e = error_queue.get()
            error_str += f"{file}: {e}"
        raise RuntimeError(error_str)

    logger.info(f"Slice done! Total time: {total_sec / 60:.2f} min, {total_count} files.")

"""

with open("./slice.py", "w") as f:
    f.write(py_slice)


In [None]:
# 元となる音声ファイル（wav形式）を入れるディレクトリ
input_dir = f"{drive_path}/inputs"

!python slice.py -i {input_dir} --model_name {model_name}


# 分割した音声ファイルに対して、音声認識モデルWhisperによる文字起こしを実施するスクリプト

より学習の品質を上げたい場合は、自動的に行なった文字起こしの後に、自分で音声を確認しながら、`Data/{model_name}/esd.list`の文字起こしを修正することをおすすめします。

In [None]:
# こういうふうに書き起こして欲しいという例文（句読点の入れ方・笑い方や固有名詞等）
initial_prompt = "こんにちは。元気、ですかー？ふふっ、私は……ちゃんと元気だよ！"

!python transcribe.py --model_name {model_name} --initial_prompt {initial_prompt} --use_hf_whisper

# 実行後Colabノードブックのランタイムを削除する

特に、有料版のColabでは、接続時間に応じてコンピューティングユニットが消費されてしまうので、節約のため、学習完了したら即座にColabのランタイムを削除する必要がある。

下記セルにおいて、`FLAG = True`として、実行すると、ランタイムが接続解除される

In [None]:
# ノートブックの解放
FLAG = False

from google.colab import runtime

if FLAG:
    runtime.unassign()