# FT用データ生成スクリプト

In [1]:
# パッケージインストール
!pip install -r requirements.sbv.txt

!conda install -y -c conda-forge kaldi \
pynini

[1;33mJupyter detected[0m[1;33m...[0m
[1;32m2[0m[1;32m channel Terms of Service accepted[0m
Channels:
 - conda-forge
 - defaults
Platform: linux-64
Collecting package metadata (repodata.json): done
Solving environment: done


    current version: 25.7.0
    latest version: 25.9.1

Please update conda by running

    $ conda update -n base -c conda-forge conda



# All requested packages already installed.



In [5]:
!pip list

Package                                  Version
---------------------------------------- ------------
aiohappyeyeballs                         2.6.1
aiohttp                                  3.13.1
aiosignal                                1.4.0
annotated-types                          0.7.0
anyio                                    4.11.0
argon2-cffi                              25.1.0
argon2-cffi-bindings                     25.1.0
arrow                                    1.4.0
asttokens                                3.0.0
async-lru                                2.0.5
attrs                                    25.4.0
audioread                                3.0.1
babel                                    2.17.0
backoff                                  2.2.1
bcrypt                                   5.0.0
beautifulsoup4                           4.14.2
bleach                                   6.2.0
blis                                     1.3.0
build                                    1.3

In [3]:
# # mfa
# # 日本語辞書のダウンロード
# !mfa model download dictionary japanese_mfa

# # 日本語音響モデルのダウンロード
# !mfa model download acoustic japanese_mfa

## テキスト対話データ生成

In [4]:
import os
from typing import Literal
import ast

from dotenv import load_dotenv
from langchain import hub
from langchain_community.vectorstores.chroma import Chroma
from langchain_community.document_loaders import DirectoryLoader
from langchain_community.document_loaders import PDFMinerLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.schema import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_google_genai import ChatGoogleGenerativeAI


# .envファイル読み込み
load_dotenv()

ImportError: cannot import name 'hub' from 'langchain' (/home1/s1f102201582/anaconda3/envs/sbv-tts/lib/python3.12/site-packages/langchain/__init__.py)

In [None]:
#config
from os.path import join, expanduser

OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
BASE_URL = "https://api.openai.iniad.org/api/v1"
MODEL='gemini-2.5-flash'
TEMPERATURE = 1.0
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"

# 生成する音声のサンプリングレート
setting_sr = 16000

#対話音声データの個数を指定
gen_dial_num = 1

# すでに作成した対話データを削除するかどうか
IS_REMOVE_EXIST_FILE = True

# ftに使うjsonとaudioの出力フォルダパス
home_dir = expanduser("~")
json_dir_path = join(home_dir, "Github/jmoshi-ft/gen_dialogue/data/mfa/transcription")
audio_dir_path = join(home_dir, "Github/jmoshi-ft/gen_dialogue/data/mfa/audio")

# mfa関連のパス
model_dir = join(home_dir, "Documents/MFA/pretrained_models/acoustic/japanese_mfa.zip")
mfa_input_dir = join(home_dir, "Github/jmoshi-ft/gen_dialogue/data/mfa/mfa_input")
mfa_output_dir = join(home_dir, "Github/jmoshi-ft/gen_dialogue/data/mfa/mfa_output")

In [None]:
base_paths = [
    json_dir_path,
    audio_dir_path,
    mfa_input_dir,
    mfa_output_dir,
]

for p in base_paths:
    if not os.path.isdir(p):
        os.makedirs(p)

In [None]:
# client作成
llm = ChatGoogleGenerativeAI(
                 model=MODEL,
                 temperature=TEMPERATURE)

In [None]:
loader = DirectoryLoader(
    "../../mental_docs/",
    glob="*.pdf",
    show_progress=True,
    # loader_cls=PyPDFLoader,
    loader_cls=PDFMinerLoader,
)
docs = loader.load()
print(f"Loaded {len(docs)} documents")

In [None]:
# Debug
# for doc in docs:
#     print("-------------------------------------------------")
#     print(doc.metadata)
#     print(len(doc.page_content))
#     print(doc.page_content[:100])

In [None]:
import pandas as pd

#読み込んだ文章データをオーバーラップ200文字で1000文字づつ分割
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
splits = text_splitter.split_documents(docs)

# 埋め込み
embedding = OpenAIEmbeddings(
    openai_api_key=OPENAI_API_KEY,
    openai_api_base=BASE_URL,
    model="text-embedding-3-small"
)

#ベクトルデータベースのChromaDBaに保存
vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=embedding
)

In [None]:
# vectorstoreから必要な情報を読み出す
retriever = vectorstore.as_retriever()

# ユーザーが与えるプロンプトに加えて、
# rag_promptを追加してLLMに与えるように設定
rag_prompt = hub.pull("rlm/rag-prompt")

In [None]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

In [None]:
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | rag_prompt
    | llm
    | StrOutputParser()
)

In [None]:
#promptを作成
prompt_txt = """臨床心理士が行うメンタルヘルスケアカウンセリングをシミュレーションし、その対話内容に相槌を含め、話し言葉のまま文字起こししてください。
会話は中途半端で終わらせず、きりが良い会話にしてください。
相槌は実際の対話を想定して細かく入れてください。
語感は固くならないようにしてください。

形式は以下のようにしてください。Aがカウンセラーで、Bがカウンセリングを受ける人です。
以下の例は３回しか言葉を交わしていませんが、500文字程度の会話になるようにしてください。
カウンセリングで話す悩みは仕事以外にも考えうる様々なテーマすべて取り扱ってください。
文字列内に「A: 」のような誰が話したのかを明記する必要はありません。

[
 "Aが話す言葉",
 "Bが話す言葉",
 "Aが話す言葉",
 ...
]
"""

In [None]:
# テキスト対話生成関数
def gen_txt_dialogue():
    return rag_chain.invoke(prompt_txt)

# 対話テキストから対話テキストリスト生成関数
def txt_to_lst(dialogue_txt):
    return ast.literal_eval(dialogue_txt)

In [None]:
#DEBUG
# txt_dialogue = gen_txt_dialogue()
# print(txt_dialogue)
# lst_dialogue = txt_to_lst(txt_dialogue)
# print(lst_dialogue)

## テキスト対話データを音声対話データに変換 

In [None]:
from style_bert_vits2.nlp import bert_models
from style_bert_vits2.constants import Languages
from pathlib import Path
from huggingface_hub import hf_hub_download
from style_bert_vits2.tts_model import TTSModel

bert_models.load_model(Languages.JP, "ku-nlp/deberta-v2-large-japanese-char-wwm")
bert_models.load_tokenizer(Languages.JP, "ku-nlp/deberta-v2-large-japanese-char-wwm")

# # 子春音あみ
# model_file = "koharune-ami/koharune-ami.safetensors"
# config_file = "koharune-ami/config.json"
# style_file = "koharune-ami/style_vectors.npy"
# hf_repo = "litagin/sbv2_koharune_ami"

# # あみたろ
# model_file = "amitaro/amitaro.safetensors"
# config_file = "amitaro/config.json"
# style_file = "amitaro/style_vectors.npy"
# hf_repo = "litagin/sbv2_amitaro"

# デフォルトの女性2
model_file = "jvnv-F2-jp/jvnv-F2_e166_s20000.safetensors"
config_file = "jvnv-F2-jp/config.json"
style_file = "jvnv-F2-jp/style_vectors.npy"
hf_repo = "litagin/style_bert_vits2_jvnv"

for file in [model_file, config_file, style_file]:
    print(file)
    hf_hub_download(hf_repo, file, local_dir="model_assets")

assets_root = Path("model_assets")

model = TTSModel(
    model_path=assets_root / model_file,
    config_path=assets_root / config_file,
    style_vec_path=assets_root / style_file,
    device="cuda",
)

In [None]:
def sbv_tts(text: str, assist_text=None):
    sr, audio = model.infer(
        text = text,
        style='Neutral',
        style_weight=1,
        split_interval = 0.3,
        use_assist_text = True if assist_text is not None else None,
        assist_text = assist_text
    )
    return sr, audio

In [None]:
def lst_to_audio_dialogue(lst_dialogue):
    # 音声ファイルを順番に生成（ファイルは不要なのでwave配列で持つ）
    wav_data = []
    for i, text in enumerate(lst_dialogue):
        speaker = "A" if i%2==0 else "B"
        sr, wav = sbv_tts(text)

        # サンプリングレートを変換
        if sr != setting_sr:
            wav = librosa.resample(wav, orig_sr=sr, target_sr=setting_sr)
        wav_data.append(wav)
    
    # 最終的な音声長を決定
    max_len = sum([len(w) for w in wav_data])
    
    # ステレオ音声用（2チャンネル×最大長）の空配列をゼロ初期化で作成
    stereo = np.zeros((2, max_len), dtype=np.float32)
    
    pos = 0
    for i, wav in enumerate(wav_data):
        ch = i%2  # 0:左(A), 1:右(B)
        stereo[ch, pos:pos+len(wav)] += wav
        pos += len(wav)
    
    # 転置(-1,2)する
    stereo = stereo.T
    return stereo

## mfa(montreal force alignment)による音声アラインメント

In [None]:
import MeCab
import re

# 句読点のパターン
PUNCT_RE = re.compile(r'^[。、,.!?！？…]+$')

def tokenize_text(text, is_punct_isolated=False):
    tokens = []
    punct_dict = {}
    checked_punct_pos = 0
    try:
        # MeCabのタガーを初期化
        tagger = MeCab.Tagger()

        # MeCabは内部でShift-JISやEUC-JPを期待することがあるため、
        # UnicodeDecodeErrorを避けるために明示的にUTF-8でエンコード・デコードする
        # parseToNodeは、より詳細な情報をノードオブジェクトとして取得できるメソッド
        node = tagger.parseToNode(text)
        while node:
            if not node.surface:
                pass
                
            elif not is_punct_isolated and PUNCT_RE.match(node.surface) and tokens:
                punct_dict[checked_punct_pos] = node.surface
                
                checked_punct_pos += len(node.surface)
                # 句読点なら直前トークンに連結
                tokens[-1] += node.surface
            else:
                checked_punct_pos += len(node.surface)
                
                # 通常トークンはそのまま追加
                tokens.append(node.surface)
            node = node.next
    except RuntimeError as e:
        print(f"MeCabの実行中にエラーが発生しました: {e}", file=sys.stderr)
        
    return tokens, punct_dict

In [None]:
def generate_txt_file_using_mecab(input_txt, path):
    tokens, punct_dict = tokenize_text(input_txt)
    output = ""
    for token in tokens:
        output += token + "\n"
        
    with open(path, "w", encoding="utf-8") as f:
        f.write(output)
    return tokens, punct_dict

In [None]:
from os.path import join, expanduser
import subprocess
import json

def alignment_channel(channel, txt, target_dir_name):
    input_dir_path = join(mfa_input_dir, target_dir_name)
    output_dir_path = join(mfa_output_dir, target_dir_name)
    os.makedirs(input_dir_path, exist_ok=True)
    os.makedirs(output_dir_path, exist_ok=True)
    
    for_align_audio_path = join(input_dir_path, f"{target_dir_name}.wav")
    for_align_txt_path = join(input_dir_path, f"{target_dir_name}.txt")

    sf.write(for_align_audio_path, channel, sr)
    _, punct_dict = generate_txt_file_using_mecab(txt, for_align_txt_path)
    subprocess.run([
        "mfa",
        "align",
        input_dir_path,
        "japanese_mfa",
        model_dir,
        output_dir_path,
        "--verbose",
        "--override",
        "--clean",
        "--output_format", "json",
        "--use_mp",
        "--beam", "1000",
        "--retry_beam", "4000",
        "--punctuation", "…",
    ])
    return punct_dict

def json_formatter_for_ft(align_json_A, align_json_B):
    json = []

    segments_A = align_json_A["tiers"]["words"]["entries"]
    segments_B = align_json_B["tiers"]["words"]["entries"]
    for segment in segments_A:
        json.append({
            "speaker": "A",
            "word": segment[2],
            "start": segment[0],
            "end": segment[1],
        })
    for segment in segments_B:
        json.append({
            "speaker": "B",
            "word": segment[2],
            "start": segment[0],
            "end": segment[1],
        })
    sorted_json = sorted(json, key=lambda seg: seg["start"])
    return sorted_json

def lst_to_line_str(lst):
    result = ""
    for s in lst:
        result += s
    return result
    
def alignment_audio_dialogue(lst_dialogue, audio_path, idx):
    # ステレオ分離: speaker A=左(0), B=右(1)と仮定
    audio, sr = sf.read(audio_path)    # (samples, channels)
    channel_A = audio[:,0]
    channel_B = audio[:,1]
    txt_lst_A = []
    txt_lst_B = []
    for i in range(len(lst_dialogue)):
        if i%2 == 0:
            txt_lst_A.append(lst_dialogue[i])
        else:
            txt_lst_B.append(lst_dialogue[i])
    A_full_txt = lst_to_line_str(txt_lst_A)
    B_full_txt = lst_to_line_str(txt_lst_B)
    
    target_dir_name_A = f"A_{idx}"
    target_dir_name_B = f"B_{idx}"
    punct_dict_A = alignment_channel(channel_A, A_full_txt, target_dir_name_A)
    punct_dict_B = alignment_channel(channel_B, B_full_txt, target_dir_name_B)
    json_path_A = join(mfa_output_dir, target_dir_name_A, f"{target_dir_name_A}.json")
    json_path_B = join(mfa_output_dir, target_dir_name_B, f"{target_dir_name_B}.json")
    with open(json_path_A, "r") as f:
        json_A = json.load(f)
    with open(json_path_B, "r") as f:
        json_B = json.load(f)
    
    ft_json = json_formatter_for_ft(json_A, json_B)

    return ft_json

## フォルダ初期化

In [None]:
import re

def get_file_name():
    wav_file_pattern = r"^(\d+)\.wav$"
    num = -1
    for file in os.listdir(audio_dir_path):
        if not os.path.exists(os.path.join(audio_dir_path, file)):
            continue
        if not re.match(wav_file_pattern, file):
            continue
    
        match_obj = re.match(wav_file_pattern, file)
        get_number = int(match_obj.groups()[0])
    
        if num < get_number:
            num = get_number
    return num

In [None]:
from glob import glob
import shutil

def delete_files(dir_path):
    shutil.rmtree(dir_path)
    os.makedirs(dir_path)

if IS_REMOVE_EXIST_FILE:
    file_name_num = -1
    for dir_path in base_paths:
        delete_files(dir_path)
else:
    file_name_num = get_file_name()

## メイン処理

In [None]:
for i in range(file_name_num+1, gen_dial_num+file_name_num+1):

    # 生成AIがリストのフォーマットでテキストを出力できない場合もあるので例外処理
    while True:
        try:
            txt_dialogue = gen_txt_dialogue()
            lst_dialogue = txt_to_lst(txt_dialogue)
            break
        except SyntaxError:
            pass
        
    stereo = lst_to_audio_dialogue(lst_dialogue)
    
    wav_name = f"{i}.wav"
    audio_file_path = os.path.join(audio_dir_path, wav_name)

    # wavファイル出力
    sf.write(audio_file_path, stereo, sr)

    json_data = alignment_audio_dialogue(lst_dialogue, audio_file_path, i)

    json_name = f"{i}.json"
    json_file_path = os.path.join(json_dir_path, json_name)
    
    # JSON出力
    with open(json_file_path, 'w', encoding='utf-8') as f:
        json.dump(json_data, f, ensure_ascii=False, indent=2)