# FT用データ生成スクリプト

In [31]:
# !conda install -y -c conda-forge kalpy \
# kaldi \
# pynini \
# sox \
# ffmpeg \
# portaudio

# # パッケージインストール
# !cd /nfs1/s1f102201582/projects/fish-speech && \
# pip install -e .[cu128]
# !pip install -r /nfs1/s1f102201582/projects/mhcc-moshi/moshi/generate_data/requirements.fs.txt

In [32]:
# # mfa
# # 日本語辞書のダウンロード
# !mfa model download dictionary japanese_mfa

# # 日本語音響モデルのダウンロード
# !mfa model download acoustic japanese_mfa

In [33]:
import os
from os.path import join, expanduser
from typing import Literal
from glob import glob
import shutil
import ast
import asyncio
import random
import subprocess
import json
import re
import functools
import time

import numpy as np
import soundfile as sf
import librosa
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from langchain_chroma import Chroma
from langchain_community.document_loaders import DirectoryLoader
from langchain_community.document_loaders import PDFMinerLoader
from langchain_openai import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents.middleware import dynamic_prompt, ModelRequest
from langchain.agents import create_agent
from langchain.agents.structured_output import ToolStrategy
from langchain_text_splitters import RecursiveCharacterTextSplitter


# .envファイル読み込み
load_dotenv("/users/s1f102201582/projects/mhcc-moshi/.env")

True

# config

In [34]:
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
BASE_URL = "https://api.openai.iniad.org/api/v1"
MODEL='gemini-2.5-flash'
TEMPERATURE = 1.0

cuda_devices = [0, 1, 2, 3]

# 生成する音声のサンプリングレート
setting_sr = 16000

#対話音声データの個数を指定
gen_dial_num = 1950

# 非同期で同時に生成するデータの最大数
MAX_SEMAPHORE = 12

# すでに作成した対話データを削除するかどうか
IS_REMOVE_EXIST_FILE = False

# tempファイルを削除するかどうか
IS_REMOVE_TMP = False

# エラーが起きたときの最大試行回数
MAX_RETRIES = 5

version = "fs/v1"

home_dir = expanduser("~")
# ftに使うjsonとaudioの出力フォルダパス
json_dir_path = join(home_dir, "projects/mhcc-moshi/moshi/data", version, "data_stereo")
audio_dir_path = join(home_dir, "projects/mhcc-moshi/moshi/data", version, "data_stereo")

# tempフォルダまでのパス
temp_dir_path = join(home_dir, "projects/mhcc-moshi/moshi/data", version, "temp")

# mfa関連のパス
model_dir = join(home_dir, "Documents/MFA/pretrained_models/acoustic/japanese_mfa.zip")
mfa_input_dir = join(temp_dir_path, "mfa_input")
mfa_output_dir = join(temp_dir_path, "mfa_output")

# fish-speech関連のパス
fs_tmp_path = join(temp_dir_path, "fs")

#RAGで読み取るPDFのパス
rag_pdf_dir = join(home_dir, "projects/mhcc-moshi/mental_docs/")

## フォルダ初期化 1

In [35]:
data_paths = [
    json_dir_path,
    audio_dir_path,
]

temp_paths = [
    fs_tmp_path,
    mfa_input_dir,
    mfa_output_dir,
]

def delete_files(dir_path):
    shutil.rmtree(dir_path)
    os.makedirs(dir_path)

# フォルダがない場合、新規作成
for p in [*data_paths, *temp_paths]:
    if not os.path.isdir(p):
        os.makedirs(p)

# IS_REMOVE_TMPがtrueの場合、tempファイルを削除
if IS_REMOVE_TMP:
    delete_files(temp_dir_path)

In [36]:
os_cuda_visible_devices = ""
for i in cuda_devices:
    os_cuda_visible_devices += str(i) + ","
os_cuda_visible_devices = os_cuda_visible_devices[:-1]

os.environ["CUDA_VISIBLE_DEVICES"] = os_cuda_visible_devices

## テキスト対話データ生成

In [37]:
# model定義
model = ChatGoogleGenerativeAI(
                 model=MODEL,
                 temperature=TEMPERATURE)

# 埋め込みモデル定義
embeddings = OpenAIEmbeddings(
    openai_api_key=OPENAI_API_KEY,
    openai_api_base=BASE_URL,
    model="text-embedding-3-large"
)

# データベース定義
vector_store = Chroma(
    collection_name="collection",
    embedding_function=embeddings,
    # persist_directory = "/path/to/db_file" # if necessary
)

In [38]:
loader = DirectoryLoader(
    rag_pdf_dir,
    glob="*.pdf",
    show_progress=True,
    loader_cls=PDFMinerLoader,
)
docs = loader.load()
print(f"Loaded {len(docs)} documents")

  0%|                                                                                     | 0/3 [00:00<?, ?it/s]Cannot set gray non-stroke color because /'P0' is an invalid float value
Cannot set gray non-stroke color because /'P1' is an invalid float value
Cannot set gray non-stroke color because /'P0' is an invalid float value
Cannot set gray non-stroke color because /'P1' is an invalid float value
Cannot set gray non-stroke color because /'P0' is an invalid float value
Cannot set gray non-stroke color because /'P1' is an invalid float value
Cannot set gray non-stroke color because /'P0' is an invalid float value
Cannot set gray non-stroke color because /'P1' is an invalid float value
Cannot set gray non-stroke color because /'P0' is an invalid float value
Cannot set gray non-stroke color because /'P1' is an invalid float value
Cannot set gray non-stroke color because /'P0' is an invalid float value
Cannot set gray non-stroke color because /'P1' is an invalid float value
Cannot set g

Loaded 3 documents





In [39]:
# Debug
# for doc in docs:
#     print("-------------------------------------------------")
#     print(doc.metadata)
#     print(len(doc.page_content))
#     print(doc.page_content[:100])

In [40]:
#読み込んだ文章データをオーバーラップ200文字で1000文字づつ分割
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    add_start_index=True, # 分割前の文章のインデックスを追跡
)
splits = text_splitter.split_documents(docs)

# データベースにデータを追加
document_ids = vector_store.add_documents(documents=splits)

In [41]:
@dynamic_prompt
def prompt_with_context(request: ModelRequest) -> str:
    """Inject context into state messages."""
    last_query = request.state["messages"][-1].text
    retrieved_docs = vector_store.similarity_search(last_query, k=2)

    docs_content = "\n\n".join(doc.page_content for doc in retrieved_docs)

    system_message = (
        "You are a helpful assistant. Use the following context in your response:"
        f"\n\n{docs_content}"
    )

    return system_message

In [42]:
class Dialogue(BaseModel):
    """対話データを構成する対話クラス"""
    speaker: Literal["A", "B"] = Field(..., description="話者。Aはカウンセラー、Bはクライエントを表す。")
    raw_text: str = Field(..., description="FishAudioタグなしの話者が話した内容。")
    tag_text: str = Field(..., description="FishAudioタグありの話者が話した内容")

class Dialogues(BaseModel):
    """カウンセリングを目的としたカウンセリング対話データ"""
    dialogues: list[Dialogue] = Field(..., description="対話データを構成する対話クラスのリスト。")

In [43]:
agent = create_agent(
    model, 
    tools=[],
    middleware=[prompt_with_context],
    response_format=ToolStrategy(
        Dialogues,
        handle_errors="フォーマットに合うように、もう一度対話データを生成してください。"
    )
)

In [44]:
#promptを作成
prompt_template = """あなたは、Fish Audioによる音声合成（TTS）用の対話シナリオを作成するプロのライターです。
以下の【要件】、【設定】、そして【指示】に従い、対話データ（Dialogues）を生成してください。

### 【目的】
メンタルヘルスケアのカウンセリング対話をシミュレーションします。

### 【出力データ構造の定義】
* **speaker**: "A" (カウンセラー) または "B" (クライエント)
* **tag_text**: 文頭に必ずFish Audio用マーカー（例: `(sad)`) を付与したテキスト。
* **raw_text**: `tag_text`からマーカーを削除した純粋なテキスト。

### 【最重要要件：自然な会話の再現】
1.  **短いターン:** 一度の発話は短く区切る。長い独白は禁止。
2.  **インターラプト:** Bが長く話そうとしたら、Aが「(listening)うんうん」と短い相槌で挟む。
3.  **タグの使用:** 文頭に必ずタグをつける。

### 【禁止事項】
* **プレースホルダーの禁止:** テキスト中に「〇〇さん」「××さん」のような伏せ字・プレースホルダーを含めることは**厳禁**です。
* **カウンセラーの「知ったかぶり」禁止:** カウンセラー(A)が、クライアント(B)が口にする前に、前回の内容や宿題について言及することを禁止します。文脈は必ずBの発言によって作ってください。

### 【指示】

1.  **会話の開始と導入フェーズ（最重要）:**
    * **基本ルール:** カウンセラー(A)は、クライアント(B)の現在の状況や前回の詳細を**忘れている/知らない**ものとして振る舞ってください。
    
    * **状況が「初回」の場合:**
        * A: まず**名前**を尋ねる。 -> B: **具体的な苗字**（佐藤、鈴木など）を名乗る。
        * A: 「今日はどのようなことでお見えになりましたか？」と**来談理由**を尋ねる。
        * B: 指定された**【トピック】**についての悩みを話し始める。
        
    * **状況が「初回」以外の場合:**
        * A: 名前は呼ばずに挨拶し、「前回からいかがですか？」や「今日はどのようなお話をしましょうか？」と**完全にオープンな質問**をする。（※「宿題はどうでしたか？」などとAから特定の話題を振ることは禁止）。
        * B: Aの質問に答える形で、**【トピック】**や**【状況】**（宿題があったことや、前回の続きなど）を**自分から説明する**。
        * A: Bの説明を聞いて初めて、「ああ、そうでしたね」や「なるほど、そういう状況なのですね」と状況を把握・確認する。

    * **共通事項:**
        * AがBの話を聞いて状況を把握した**後**に、初めて具体的なカウンセリングへ移行してください。
        * 導入のヒアリング段階であっても、「短いターン」と「インターラプト」のルールは守ってください。

2.  **設定:**
    * クライアント(B)のペルソナ: {selected_persona}
    * トピック: {selected_topic}
    * 状況: {selected_situation}
    * 話し方指示: {selected_style_instruction}

3.  **会話の構成:**
    * 会話量は **4分程度** を目安とし、**25〜35往復（ターン）** 程度生成してください。
    * ペルソナの背景（年齢・職業）を反映させる（直接言わせず、内容で匂わせる）。
    * クライアント(B)の話し方指示（よどみ具合など）を忠実に守る。

出力は指定された `Dialogues` スキーマに従ってください。
"""

# トピックリスト（悩みの種類）(全40項目)
topic_list = [
  # 仕事・キャリア関連
  "仕事のプレッシャーや過労、バーンアウト",
  "職場の人間関係（上司、同僚、部下）",
  "キャリアプランの悩み、キャリアチェンジの不安",
  "転職・就職活動のストレス",
  "ハラスメント（パワハラ、モラハラなど）の影響",
  "仕事へのモチベーション低下、やる気が出ない",

  # 自己認識・感情関連
  "自己肯定感の低さ、自分を責めてしまう",
  "完璧主義、失敗への過度な恐れ",
  "他人の評価が過度に気になる、承認欲求",
  "劣等感（他人との比較）",
  "自分のやりたいことが分からない、アイデンティティの悩み",
  "感情のコントロールが難しい（怒り、イライラ、悲しみ）",
  "ネガティブ思考の癖、反芻思考（同じことをぐるぐる考えてしまう）",
  "焦燥感、何かに追われている感覚",
  "罪悪感（休むことへの罪悪感など）",
  "虚無感、むなしさ、生きがいを感じられない",
  "趣味や楽しみを感じられない（アンヘドニア）",
  
  # 対人関係
  "家族関係（親子、夫婦、兄弟、親戚）",
  "友人関係や恋愛関係の悩み",
  "コミュニケーションへの苦手意識（雑談、会議での発言など）",
  "人に頼ることができない、甘えられない",
  "他人の期待に応えすぎようとしてしまう（ピープルプリーザー）",
  "境界線（バウンダリー）の問題（NOと言えない）",
  "孤独感、疎外感",
  "HSP（繊細さ）に関する悩み",
  
  # 生活・健康関連
  "将来への漠然とした不安",
  "気分の落ち込み、無気力",
  "睡眠に関する悩み（寝付けない、途中で起きる、過眠）",
  "生活リズムの乱れ",
  "体調不良（頭痛、倦怠感、腹痛など）と気分の関連",
  "健康不安（病気への過度な心配）",

  # 習慣・行動関連
  "決断疲れ、何かを選ぶことができない",
  "先延ばし癖、物事が始められない",
  "SNS疲れ、デジタルデトックスの必要性",

  # 特定の出来事
  "ライフイベント（引っ越し、結婚、出産、育児、介護）に伴うストレス",
  "特定の出来事による軽いトラウマやフラッシュバック",
  "喪失体験（別れ、死別）からの回復（グリーフケア）",
  "過去の選択への後悔"
]

# 状況リスト（セッションの場面）
situation_list = [
  "初回",
  "初期",
  "中期（深掘り）",
  "中期（宿題の確認）",
  "中期（感情の表出）",
  "後期",
  "終了（終結）"
]

# クライアントのペルソナリスト（年齢と職業/立場のみ）
persona_list = [
    "20代・会社員",
    "30代・会社員",
    "40代・会社員",
    "50代・会社員",
    "20代・大学生",
    "20代・大学院生",
    "30代・管理職",
    "40代・主婦/主夫",
    "30代・フリーランス",
    "20代・アルバイト",
    "40代・パートタイム",
    "50代・自営業",
    "20代・求職中",
    "30代・育児休業中"
]

# Fish Audio用マーカーリスト (プロンプト埋め込み用)
fish_audio_markers = """
【基本感情】
(angry) (sad) (excited) (surprised) (satisfied) (delighted) 
(scared) (worried) (upset) (nervous) (frustrated) (depressed) 
(empathetic) (embarrassed) (disgusted) (moved) (proud) (relaxed) 
(grateful) (confident) (interested) (curious) (confused) (joyful)

【高度な感情】
(disdainful) (unhappy) (anxious) (hysterical) (indifferent) 
(impatient) (guilty) (scornful) (panicked) (furious) (reluctant) 
(keen) (disapproving) (negative) (denying) (astonished) (serious) 
(sarcastic) (conciliative) (comforting) (sincere) (sneering) 
(hesitating) (yielding) (painful) (awkward) (amused)

【トーン】
(in a hurry tone) (shouting) (screaming) (whispering) (soft tone)

【特殊効果】
(laughing) (chuckling) (sobbing) (crying loudly) (sighing) 
(panting) (groaning) (crowd laughing) (background laughter) (audience laughing)
"""

# 話し方指示（Fish Audioタグ指定付き）
speaking_style_data = {
    "よどみが多い（ためらいがち）": "話者Bの発話において、`(hesitating)` `(sighing)` などのタグや、フィラー（「えーと」「あのー」）を多用し、言葉に詰まる様子を表現してください。",
    "普通（自然な会話）": "文脈に合わせて適切な【基本感情】タグを文頭に付与し、自然な会話の範囲でフィラーを含めてください。",
    "スムーズ（よどみ少なめ）": "話者Bの発話は流暢にし、`(confident)` `(serious)` などのタグを用いて、しっかりとした口調を表現してください。"
}

def gen_prompt_txt():
    # ランダムに選択
    selected_topic = random.choice(topic_list)
    selected_situation = random.choice(situation_list)
    selected_persona = random.choice(persona_list)
    selected_style_name = random.choice(list(speaking_style_data.keys()))
    selected_style_instruction = speaking_style_data[selected_style_name]

    # print("選ばれたトピック:", selected_topic)
    # print("選ばれた状況:", selected_situation)
    # print("選ばれたペルソナ:", selected_persona)
    # print("選ばれたクライアントのスタイル:", selected_style_name)

    # 変数をプロンプトに埋め込む
    final_prompt = prompt_template.format(
        selected_persona=selected_persona,
        selected_topic=selected_topic,
        selected_situation=selected_situation,
        selected_style_name=selected_style_name,
        selected_style_instruction=selected_style_instruction,
        fish_audio_markers=fish_audio_markers
    )
    
    return final_prompt

In [45]:
# テキスト対話生成関数
async def gen_txt_dialogue():
    prompt = gen_prompt_txt()
    
    resp = await agent.ainvoke({"messages": [{"role": "user", "content": prompt}]})
    dialogues_list = resp["structured_response"].dialogues
    return dialogues_list

In [46]:
#DEBUG
# txt_dialogue = gen_txt_dialogue()
# print(txt_dialogue)
# lst_dialogue = txt_to_lst(txt_dialogue)
# print(lst_dialogue)

## テキスト対話データを音声対話データに変換 

In [47]:
def build_audio_synth_prompt(text_dialogue_list):
    resp = ""
    resp_header =  """あなたがこれから音声合成するテキストは以下の対話内容のワンフレーズです。
この対話の文脈に合うように音声合成してください。

<対話内容の全文>"""
    resp += resp_header
    for text_dial in text_dialogue_list:
        resp += f"\n{text_dial.speaker}: {text_dial.raw_text}"
    return resp

In [48]:
prompt_tokens_path = os.path.join(home_dir, "projects/mhcc-moshi/moshi/fake.npy")

async def tts(text: str, idx: int, idx_in_dial: int, speaker: Literal["A", "B"], prompt_text=None):
    print(f"[{idx}-{idx_in_dial}] の意味トークンの生成を開始します")
    
    cuda_i = idx % len(cuda_devices)
    output_dir = os.path.join(fs_tmp_path, f"{idx}-{idx_in_dial}")
    cmd = f"""python /nfs1/s1f102201582/projects/fish-speech/fish_speech/models/text2semantic/inference.py --text "{text}" --prompt-text "{prompt_text}" --prompt-tokens {prompt_tokens_path} --output-dir {output_dir} --device "cuda:{cuda_devices[cuda_i]}" --checkpoint-path /nfs1/s1f102201582/projects/fish-speech/checkpoints/openaudio-s1-mini/ """
    
    process = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await process.communicate()
    print(f"[{idx}-{idx_in_dial}] の意味トークンの生成が完了しました")
    print(f"[{idx}-{idx_in_dial}] の音声の生成を開始します")

    semantic_path = os.path.join(output_dir, "codes_0.npy")
    out_path = os.path.join(output_dir, "fake.wav")
    cmd = f"""python /nfs1/s1f102201582/projects/fish-speech/fish_speech/models/dac/inference.py -i {semantic_path} --output-path {out_path} --checkpoint-path /nfs1/s1f102201582/projects/fish-speech/checkpoints/openaudio-s1-mini/codec.pth"""
    
    process = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await process.communicate()
    print(f"[{idx}-{idx_in_dial}] の音声の生成が完了しました")
    
    audio, sr = librosa.load(out_path, sr=setting_sr)
    return audio, sr

In [49]:
async def gen_audio_dialogue(idx, text_dialogue_list, prompt):
    # 音声ファイルを順番に生成（ファイルは不要なのでwave配列で持つ）
    wav_data = []
    for idx_in_dial, dial in enumerate(text_dialogue_list):
        speaker = dial.speaker
        wav, sr = await tts(dial.tag_text, idx, idx_in_dial, speaker, prompt)

        # サンプリングレートを変換
        if sr != setting_sr:
            # 16ビット整数のデータを、-1.0から1.0の範囲に収まる浮動小数点数に正規化
            wav = wav.astype(np.float32) / 32768.0
            wav = librosa.resample(wav, orig_sr=sr, target_sr=setting_sr)

        # 0.3秒間の無音時間を追加
        duration_sec = 0.3
        num_silent_samples = int(setting_sr*duration_sec)
        silence = np.zeros(num_silent_samples, dtype=wav.dtype)
        wav_with_silence = np.concatenate((wav, silence))
        wav_data.append(wav_with_silence)
    
    # 最終的な音声長を決定
    max_len = sum([len(w) for w in wav_data])
    
    # ステレオ音声用（2チャンネル×最大長）の空配列をゼロ初期化で作成
    stereo = np.zeros((2, max_len), dtype=np.float32)
    
    pos = 0
    for i, wav in enumerate(wav_data):
        ch = i%2  # 0:左(A), 1:右(B)
        stereo[ch, pos:pos+len(wav)] += wav
        pos += len(wav)
    
    # 転置(-1,2)する
    stereo = stereo.T
    return stereo

## mfa(montreal force alignment)による音声アラインメント

In [50]:
import copy

def correct_json(full_text, align_json):
    new_align_json = copy.deepcopy(align_json)
    segments = new_align_json["tiers"]["words"]["entries"]
    checked_len = 0
    prev_checked_len = 0
    i = 0
    while i < len(segments):
        if re.search(f"^<unk>|<sil>$", segments[i][2]):
            if i == 0:
                if re.search(f"^<unk>|<sil>$", segments[i+1][2]):
                    end_time = 0
                    while re.search(f"^<unk>|<sil>$", segments[i+1][2]):
                        end_time = segments[i+1][1]
                        segments.pop(i+1)
                    segments[i][1] = end_time
                
                m = re.search(f"^(.+?){segments[i+1][2]}", full_text[checked_len:])
                match_text = m.groups()
                segments[i][2] = match_text[0]
            elif i == len(segments)-1:
                m = re.search(f"{segments[i-1][2]}(.+?)$", full_text[checked_len:])
                match_text = m.groups()
                segments[i][2] = match_text[0]
            else:
                if re.search(f"^<unk>|<sil>$", segments[i+1][2]):
                    end_time = 0
                    while re.search(f"^<unk>|<sil>$", segments[i+1][2]):
                        end_time = segments[i+1][1]
                        segments.pop(i+1)
                    segments[i][1] = end_time
                m = re.search(f"^{segments[i-1][2]}(.+?){segments[i+1][2]}", full_text[prev_checked_len:])
                match_text = m.groups()
                segments[i][2] = match_text[0]
        else:
            if re.search(f"^([。、,.!?！？…「」]){segments[i][2]}.*$", full_text[checked_len:]):
                m = re.search(f"^([。、,.!?！？…「」]){segments[i][2]}.*$", full_text[checked_len:])
                match_punc = m.groups()
                segments[i][2] = match_punc[0] + segments[i][2]
            elif re.search(f"^{segments[i][2]}([。、,.!?！？…「」]).*$", full_text[checked_len:]):
                m = re.search(f"^{segments[i][2]}([。、,.!?！？…「」]).*$", full_text[checked_len:])
                match_punc = m.groups()
                segments[i][2] = segments[i][2] + match_punc[0]
                
        prev_checked_len = checked_len
        checked_len += len(segments[i][2])
        i += 1
    return new_align_json

In [51]:
mfa_lock = asyncio.Lock()

async def alignment_channel(channel, target_dir_name):
    input_dir_path = join(mfa_input_dir, target_dir_name)
    output_dir_path = join(mfa_output_dir, target_dir_name)
    os.makedirs(input_dir_path, exist_ok=True)
    os.makedirs(output_dir_path, exist_ok=True)

    cmd = f'mfa align --quiet --overwrite --clean --final_clean --output_format json {input_dir_path} "japanese_mfa" {model_dir} {output_dir_path} --beam 10000 --retry_beam 40000 '

    async with mfa_lock:
        process = await asyncio.create_subprocess_shell(
                cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await process.communicate()
    if stderr:
        print("stderr:", stderr.decode())
    if stdout:
        print("stdout:", stdout.decode())
    
    # subprocess.run([
    #     "mfa",
    #     "align",
    #     input_dir_path,
    #     "japanese_mfa",
    #     model_dir,
    #     output_dir_path,
    #     "--quiet",
    #     "--overwrite",
    #     "--clean",
    #     "--final_clean",
    #     "--output_format", "json",
    #     "--beam", "10000",
    #     "--retry_beam", "40000",
    # ])      

def parse_ft_json(json_data):
    result = {"alignments": []}

    segments = json_data["tiers"]["words"]["entries"]
    for segment in segments:
        result["alignments"].append([
            segment[2],
            [segment[0], segment[1]],
            "SPEAKER_MAIN"
        ])
    result["alignments"].sort(key=lambda x: x[1][0])
    return result

def write_dialogue_text(text_dialogue_list, out_file_path):
    oneline_text = ""
    result = ""
    for dial in text_dialogue_list:
        if dial.speaker == "A":
            result += dial.raw_text + "\n"
            oneline_text += dial.raw_text
    with open(out_file_path, "w") as f:
        f.write(result)
    return oneline_text

async def alignment_audio_dialogue(text_dialogue_list, audio_path, idx):
    target_dir_name = str(idx)
    target_dir = os.path.join(mfa_input_dir, target_dir_name)
    if not os.path.isdir(target_dir):
        os.makedirs(target_dir)

    target_text_file = os.path.join(target_dir, f"{idx}.txt")
    oneline_text = write_dialogue_text(text_dialogue_list, target_text_file)

    wav_name = f"{idx}.wav"
    src_wav_path = audio_path
    dist_wav_path = os.path.join(target_dir, wav_name)
    shutil.copy(src_wav_path, dist_wav_path)

    audio, sr = sf.read(audio_path)
    await alignment_channel(audio[:, 0], target_dir_name)
    
    json_path = os.path.join(mfa_output_dir, target_dir_name, f"{idx}.json")
    json_data = {}
    with open(json_path, "r") as f:
        json_data = json.load(f)

    try:
        correct_json_data = correct_json(oneline_text, json_data)
        ft_json = parse_ft_json(correct_json_data)
    except:
        print(f"jsonファイル {idx}.json の訂正に失敗しました。")
        ft_json= parse_ft_json(json_data)
    
    return ft_json

## フォルダ初期化 2

In [52]:
def get_file_name():
    wav_file_pattern = r"^(\d+)\.wav$"
    num = -1
    for file in os.listdir(audio_dir_path):
        if not os.path.exists(os.path.join(audio_dir_path, file)):
            continue
        if not re.match(wav_file_pattern, file):
            continue
    
        match_obj = re.match(wav_file_pattern, file)
        get_number = int(match_obj.groups()[0])
    
        if num < get_number:
            num = get_number
    return num

In [53]:
if IS_REMOVE_EXIST_FILE:
    file_name_num = -1
    for dir_path in data_paths:
        delete_files(dir_path)
else:
    file_name_num = get_file_name()

## メイン処理

In [54]:
class Processor:
    def __init__(self, wrapped_task):
        # 同時実行数を制限するセマフォ
        self.sem = asyncio.Semaphore(MAX_SEMAPHORE)
        
        # 全体の実行可否を制御するイベント
        self.is_healthy = asyncio.Event()
        self.is_healthy.set()
        
        # リカバリー処理が重複しないようにするためのロック
        self.recovery_lock = asyncio.Lock()
        self.wrapped_task = wrapped_task

    async def run_task(self, task_id):
        """個々のタスクを実行するラッパー"""
        
        # 1. システムが健康(is_healthy)になるまで待機
        #    エラー発生中はここで全ての新しいタスクが止まります
        await self.is_healthy.wait()

        async with self.sem:
            # セマフォ取得後、念のため再度チェック (待機中にエラーが起きた場合のため)
            if not self.is_healthy.is_set():
                await self.is_healthy.wait()

            try:
                return await self.wrapped_task(task_id)
            except Exception as e:
                if MAX_RETRIES <= 0:
                    raise e
                # エラー発生時の処理へ
                await self.handle_error_and_retry(task_id)

    async def handle_error_and_retry(self, task_id):
        """
        エラー発生時のリカバリー処理。
        他のタスクをブロックし、指数的バックオフでリトライする。
        """
        # リカバリー権限を取得 (同時に複数のエラーが起きても、処理するのは1つずつ)
        async with self.recovery_lock:
            print(f"--- [Stop] Task {task_id} がシステムを一時停止しました ---")
            
            # 2. 全体の実行をストップ (赤信号)
            self.is_healthy.clear()

            retry_count = 1
            while True:
                try:
                    # ( retry_count + 1 )^2分待つ
                    backoff_time = ((retry_count+1)**2)*60
                    # 指数的バックオフ待機
                    print(f"... Task {task_id}: リトライ待機中 ({backoff_time}s) ...")
                    await asyncio.sleep(backoff_time)

                    # リトライ実行
                    print(f"--- [Retry] Task {task_id}: 再実行中 ---")
                    await self.wrapped_task(task_id)
                    
                    # 3. 成功したらブロック解除 (青信号)
                    print(f"--- [Resume] Task {task_id}: 成功！ システムを再開します ---")
                    self.is_healthy.set()
                    break # ループを抜ける

                except Exception as e:
                    print(f"[x] Task {task_id}: リトライ失敗。次は {backoff_time}秒待ちます。")
                    if retry_count == MAX_RETRIES:
                        raise e
                    
                    retry_count += 1

In [55]:
async def gen_dialogue(task_id):
    print(f"{task_id}番目のデータの生成を開始しました。")
    # テキスト生成
    txt_dialogue_list = await gen_txt_dialogue()
    
    # 音声合成のためのプロンプト生成
    audio_synth_prompt = build_audio_synth_prompt(txt_dialogue_list)
    
    # 対話テキストを音声合成
    stereo = await gen_audio_dialogue(task_id, txt_dialogue_list, audio_synth_prompt)
    
    wav_name = f"{task_id}.wav"
    audio_file_path = os.path.join(audio_dir_path, wav_name)
    
    # wavファイル出力
    sf.write(audio_file_path, stereo, setting_sr)
    print(f"{task_id}番目のデータの生成が完了しました。")
    
    return txt_dialogue_list, audio_file_path, task_id

async def align(txt_dialogue_list, audio_file_path, i):
    try:
        print(f"{i}番目のアライメントデータの生成を開始しました。")
        
        # 音声アラインメント
        json_data = await alignment_audio_dialogue(txt_dialogue_list, audio_file_path, i)
    
        json_name = f"{i}.json"
        json_file_path = os.path.join(json_dir_path, json_name)
        
        # JSON出力
        with open(json_file_path, 'w', encoding='utf-8') as f:
            json.dump(json_data, f, ensure_ascii=False, indent=2)
        print(f"{i}番目のアライメントデータの生成が完了しました。")
    except Exception as e:
        print(f"{i}番目のアライメントデータの生成が失敗しました。")
        raise e

In [56]:
moc_text_dial_list = [Dialogue(speaker='A', raw_text='こんにちは。今日は、どのようなお話をしましょうか？', tag_text='(neutral)こんにちは。今日は、どのようなお話をしましょうか？'), Dialogue(speaker='B', raw_text='あ、はい。今日は、えっと、これで、あの、最終回ということなので...。なんだか、やっぱり、これで終わりかと思うと、少し寂しいような気もしてまして。', tag_text='(thinking)あ、はい。今日は、えっと、これで、あの、最終回ということなので...。なんだか、やっぱり、これで終わりかと思うと、少し寂しいような気もしてまして。'), Dialogue(speaker='B', raw_text='でも、前回お話した、こう、自分を責めてしまう気持ちとか、少しずつですけど、減ってきたような気がします。', tag_text='(sad)でも、前回お話した、こう、自分を責めてしまう気持ちとか、少しずつですけど、減ってきたような気がします。'), Dialogue(speaker='A', raw_text='うんうん。', tag_text='(listening)うんうん。'), Dialogue(speaker='A', raw_text='そうでしたね。終わりが近いと、やはり色々な気持ちが出てきますよね。ご自身を責めてしまう気持ちが減ってきたと感じられているのですね。', tag_text='(neutral)そうでしたね。終わりが近いと、やはり色々な気持ちが出てきますよね。ご自身を責めてしまう気持ちが減ってきたと感じられているのですね。'), Dialogue(speaker='B', raw_text='はい。以前は、些細なことでも「私が悪かったんだ」って、すぐに落ち込んじゃってましたから。', tag_text='(happy)はい。以前は、些細なことでも「私が悪かったんだ」って、すぐに落ち込んじゃってましたから。'), Dialogue(speaker='A', raw_text='うんうん。', tag_text='(listening)うんうん。'), Dialogue(speaker='B', raw_text='でも、ここに通い始めて、こう、自分の感情とか行動を、少し客観的に見られるようになったのが大きかったです。', tag_text='(thinking)でも、ここに通い始めて、こう、自分の感情とか行動を、少し客観的に見られるようになったのが大きかったです。'), Dialogue(speaker='A', raw_text='客観的に見られるようになった、というのは、具体的にどのような時に感じられましたか？', tag_text='(neutral)客観的に見られるようになった、というのは、具体的にどのような時に感じられましたか？'), Dialogue(speaker='B', raw_text='そうですね...。例えば、パート先でちょっとしたミスをした時とかに、前はもう、ずっと頭の中で「なんで私ってこんなにダメなんだろう」って、ぐるぐる考えてしまっていたんですけど。', tag_text='(thinking)そうですね...。例えば、パート先でちょっとしたミスをした時とかに、前はもう、ずっと頭の中で「なんで私ってこんなにダメなんだろう」って、ぐるぐる考えてしまっていたんですけど。'), Dialogue(speaker='A', raw_text='うんうん。', tag_text='(listening)うんうん。'), Dialogue(speaker='B', raw_text='最近は、「あ、これはミスしたけど、次は気をつけよう」って、ちょっと切り替えができるようになってきたというか。', tag_text='(neutral)最近は、「あ、これはミスしたけど、次は気をつけよう」って、ちょっと切り替えができるようになってきたというか。'), Dialogue(speaker='A', raw_text='なるほど。その「切り替えができるようになった」というのは、Bさんにとって大きな変化だったのですね。', tag_text='(neutral)なるほど。その「切り替えができるようになった」というのは、Bさんにとって大きな変化だったのですね。'), Dialogue(speaker='B', raw_text='はい、本当に。以前は、もう、そのミスしたこと自体で、一日の気分が台無しになってしまっていたので。', tag_text='(happy)はい、本当に。以前は、もう、そのミスしたこと自体で、一日の気分が台無しになってしまっていたので。'), Dialogue(speaker='A', raw_text='うんうん。', tag_text='(listening)うんうん。'), Dialogue(speaker='A', raw_text='ご自身の変化を実感されているのですね。素晴らしいことです。', tag_text='(neutral)ご自身の変化を実感されているのですね。素晴らしいことです。'), Dialogue(speaker='B', raw_text='でも、まだ、こう、たまに、本当に些細なことで、「あー、また私、やってしまった」って、一瞬だけですけど、やっぱりそう思ってしまうこともあって...。', tag_text='(thinking)でも、まだ、こう、たまに、本当に些細なことで、「あー、また私、やってしまった」って、一瞬だけですけど、やっぱりそう思ってしまうこともあって...。'), Dialogue(speaker='A', raw_text='うんうん。', tag_text='(listening)うんうん。'), Dialogue(speaker='A', raw_text='そうですよね。完全にゼロにするというのは、なかなか難しいことかもしれませんね。', tag_text='(neutral)そうですよね。完全にゼロにするというのは、なかなか難しいことかもしれませんね。'), Dialogue(speaker='B', raw_text='はい。だから、これからも、こう、自分で、あの、考えすぎないように、気をつけなきゃなって思ってます。', tag_text='(neutral)はい。だから、これからも、こう、自分で、あの、考えすぎないように、気をつけなきゃなって思ってます。'), Dialogue(speaker='A', raw_text='ご自身で、その考え方と上手に付き合っていく方法を見つけられているのですね。', tag_text='(neutral)ご自身で、その考え方と上手に付き合っていく方法を見つけられているのですね。'), Dialogue(speaker='B', raw_text='はい。先生と話していく中で、こう、「完璧じゃなくてもいいんだ」って思えるようになったのが、すごく、大きくて。', tag_text='(happy)はい。先生と話していく中で、こう、「完璧じゃなくてもいいんだ」って思えるようになったのが、すごく、大きくて。'), Dialogue(speaker='A', raw_text='うんうん。', tag_text='(listening)うんうん。'), Dialogue(speaker='A', raw_text='「完璧じゃなくてもいいんだ」という気づきは、Bさんにとって、どのような意味がありましたか？', tag_text='(neutral)「完璧じゃなくてもいいんだ」という気づきは、Bさんにとって、どのような意味がありましたか？'), Dialogue(speaker='B', raw_text='なんだか、肩の力が抜けた、というか...。全部を一人で抱え込まなくてもいいんだって、少し楽になれた気がします。', tag_text='(relief)なんだか、肩の力が抜けた、というか...。全部を一人で抱え込まなくてもいいんだって、少し楽になれた気がします。'), Dialogue(speaker='A', raw_text='それは良かったです。これからの生活の中で、また悩むことが出てきたとしても、今、Bさんが得られたその感覚を思い出して、ご自身を大切にしてくださいね。', tag_text='(neutral)それは良かったです。これからの生活の中で、また悩むことが出てきたとしても、今、Bさんが得られたその感覚を思い出して、ご自身を大切にしてくださいね。'), Dialogue(speaker='B', raw_text='はい...。本当に、ありがとうございました。最初は、まさか自分がカウンセリングに来るなんて思ってもみなくて...。', tag_text='(sad)はい...。本当に、ありがとうございました。最初は、まさか自分がカウンセリングに来るなんて思ってもみなくて...。'), Dialogue(speaker='A', raw_text='うんうん。', tag_text='(listening)うんうん。'), Dialogue(speaker='B', raw_text='でも、本当に来て良かったです。先生のおかげで、少し前向きになれました。', tag_text='(neutral)でも、本当に来て良かったです。先生のおかげで、少し前向きになれました。'), Dialogue(speaker='A', raw_text='Bさんが、ご自身のペースで、前向きに進んでいかれることを、私も応援しています。また何かありましたら、いつでも頼ってくださいね。', tag_text='(happy)Bさんが、ご自身のペースで、前向きに進んでいかれることを、私も応援しています。また何かありましたら、いつでも頼ってくださいね。'), Dialogue(speaker='B', raw_text='はい！ ありがとうございます。', tag_text='(happy)はい！ ありがとうございます。')]

moc_audio_file_path = join("/users/s1f102201582/projects/mhcc-moshi/moshi/data/mock/data_stereo/0.wav")

In [57]:
# # gen_dialogueのモック
# async def gen_dialogue(task_id):
#     wav_name = f"{task_id}.wav"
#     audio_file_path = os.path.join(audio_dir_path, wav_name)
#     shutil.copy(moc_audio_file_path, audio_file_path)

#     return moc_text_dial_list, audio_file_path, task_id

In [58]:
async def gen_data(task_id):
    text_dial_list, audio_file_path, task_id = await gen_dialogue(task_id)
    return await align(text_dial_list, audio_file_path, task_id)

In [59]:
async def main():
    processor = Processor(gen_data)
    all_tasks = []
    
    for i in range(file_name_num+1, gen_dial_num+file_name_num+1):
        cor = processor.run_task(i)
        task = asyncio.create_task(cor)
        all_tasks.append(task)
    await asyncio.gather(*all_tasks)

In [60]:
print(f"[{time.strftime('%X')}] asyncio.run(main()) を呼び出します")
start_run = time.time()
await main()
end_run = time.time()
print(f"[{time.strftime('%X')}] asyncio.run() が終了しました (実行時間: {end_run - start_run:.2f}s)")

[13:31:37] asyncio.run(main()) を呼び出します
0番目のアライメントデータの生成を開始しました。
stderr: [2;36m [0m[32mINFO    [0m Setting up corpus information[33m...[0m                                      
[2;36m [0m[32mINFO    [0m Loading corpus from source files[33m...[0m                                   
[2;36m [0m[32mINFO    [0m Found [1;36m1[0m speaker across [1;36m1[0m file, average number of utterances per       
[2;36m [0m         speaker: [1;36m1.0[0m                                                          
[2;36m [0m[32mINFO    [0m Initializing multiprocessing jobs[33m...[0m                                  
[2;36m [0m         MFA will only use [1;36m1[0m jobs. Use the --single_speaker flag if you would  
[2;36m [0m         like to split utterances across jobs regardless of their speaker.     
[2;36m [0m[32mINFO    [0m Normalizing text[33m...[0m                                                   
[2;36m [0m[32mINFO    [0m Generating MFCCs[33m...[0m         