In [1]:
'''
构造raw_data，等同于prepare_align.py
'''
import os
import sys
import argparse
import json
import yaml
import librosa
import numpy as np
from scipy.io import wavfile

sys.path.append('/home/you/workspace/son/FastSpeech2')
from text import _clean_text


def split_json_by_splitset(input_file, output_dir):
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Load the JSON file
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # Group data by Split_Set
    split_data = {}
    for key, value in data.items():
        split_set = value.get("Split_Set", "Unknown")
        if split_set not in split_data:
            split_data[split_set] = {}
        split_data[split_set][key] = value

    # Save each group to a separate JSON file
    for split_set, items in split_data.items():
        output_file = os.path.join(output_dir, f"{split_set}.json")
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(items, f, indent=4, ensure_ascii=False)

    print(f"Data has been split and saved into {output_dir}")


def copy_files_from_list(config, part_file):
    in_dir = config["path"]["corpus_path"]
    out_dir = config["path"]["raw_path"]
    sampling_rate = config["preprocessing"]["audio"]["sampling_rate"]
    max_wav_value = config["preprocessing"]["audio"]["max_wav_value"]
    cleaners = config["preprocessing"]["text"]["text_cleaners"]
    data_part_path = os.path.join(in_dir, 'Labels', part_file)
    if not os.path.exists(data_part_path):
        split_json_by_splitset(os.path.join(in_dir, 'Labels', 'labels_consensus.json'), os.path.join(in_dir, 'Labels'))
    
    
    # Ensure output directories exist
    os.makedirs(out_dir, exist_ok=True)

    # Load the JSON file
    with open(data_part_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # Get the list of keys (audio file names)
    for audio_file, metadata in data.items():
        speaker_id = metadata.get("SpkrID", "unknown")
        speaker_folder = os.path.join(out_dir, speaker_id)
        os.makedirs(speaker_folder, exist_ok=True)
    
        wav_path = os.path.join(os.path.join(in_dir, 'Audios'), audio_file)
        if os.path.exists(wav_path):
            wav, _ = librosa.load(wav_path, sampling_rate)
            epsilon = 1e-6  # 小常数，防止分母过小
            wav = wav / (np.max(np.abs(wav)) + epsilon) * max_wav_value
            wav_output_path = os.path.join(speaker_folder, audio_file)
            wavfile.write(wav_output_path, sampling_rate, wav.astype(np.int16))
        
        base_name = os.path.splitext(audio_file)[0]
        transcript_file = base_name + '.txt'
        transcript_path = os.path.join(os.path.join(in_dir, 'Transcripts'), transcript_file)
        if os.path.exists(transcript_path):
            with open(transcript_path, 'r', encoding='utf-8') as f:
                text = f.read()
            cleaned_text = _clean_text(text, cleaners)
            transcript_output_path = os.path.join(speaker_folder, base_name + ".lab")
            with open(transcript_output_path, 'w', encoding='utf-8') as f:
                f.write(cleaned_text)

    print(f"Files have been copied to {out_dir}")
        
        

config = yaml.load(open('/home/you/workspace/son/FastSpeech2/config/MSP/preprocess.yaml', "r"), Loader=yaml.FullLoader)
part_files = ['Development.json', 'Train.json', 'Test1.json', 'Test2.json']
part_file = part_files[1]
copy_files_from_list(config, part_file)
# if __name__ == "__main__":
#     parser = argparse.ArgumentParser()
#     parser.add_argument("config", type=str, help="path to preprocess.yaml")
#     args = parser.parse_args()

#     config = yaml.load(open(args.config, "r"), Loader=yaml.FullLoader)
#     copy_files_from_list(config)

Files have been copied to /home/you/workspace/son/FastSpeech2/raw_data/MSP


In [2]:
import shutil
import yaml, json
def trans_textgrid(config, part_file):
    '''
    part_file对应的textgrid文件转移到/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP
    '''
    in_dir = config["path"]["corpus_path"]
    out_dir = config["path"]["preprocessed_path"]
    out_dir = os.path.join(out_dir, 'TextGrid')
    data_part_path = os.path.join(in_dir, 'Labels', part_file)
    if not os.path.exists(data_part_path):
        split_json_by_splitset(os.path.join(in_dir, 'Labels', 'labels_consensus.json'), os.path.join(in_dir, 'Labels'))
    # Ensure output directories exist
    os.makedirs(out_dir, exist_ok=True)
    # Load the JSON file
    with open(data_part_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    for audio_file, metadata in data.items():
        speaker_id = metadata.get("SpkrID", "unknown")
        speaker_folder = os.path.join(out_dir, speaker_id)
        os.makedirs(speaker_folder, exist_ok=True)
        base_name = os.path.splitext(audio_file)[0]
        textgrid_file_name = base_name + '.TextGrid'
        textgrid_path = os.path.join(os.path.join(in_dir, 'ForceAligned'), textgrid_file_name)
        if os.path.exists(textgrid_path):
            shutil.copy(textgrid_path, os.path.join(speaker_folder, textgrid_file_name))
        

config = yaml.load(open('/home/you/workspace/son/FastSpeech2/config/MSP/preprocess.yaml', "r"), Loader=yaml.FullLoader)
trans_textgrid(config, part_file)

In [3]:
'''
textgrid文件有些是空的，会导致preprocess.py报错，这里检测并处理。
'''
import os
import tgt

# 输入路径
textgrid_folder = "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/TextGrid"
raw_data_folder = "/home/you/workspace/son/FastSpeech2/raw_data"

n = 0
for root, dirs, files in os.walk(textgrid_folder):
    for file in files:
        tg_path = os.path.join(root, file)
        try:
            # 尝试读取 TextGrid 文件
            textgrid = tgt.io.read_textgrid(tg_path)
        except Exception as e:
            # 处理错误
            print(f"Error processing TextGrid: {tg_path}")
            
            # 删除出错的 TextGrid 文件
            os.remove(tg_path)
            print(f"Deleted TextGrid: {tg_path}")
            
            # 提取 basename 和 spkrID
            basename = os.path.splitext(file)[0]
            spkrID = os.path.basename(root)  # 获取上级目录名作为 spkrID
            
            # 构造对应的 .wav 和 .lab 文件路径
            wav_path = os.path.join(raw_data_folder, 'MSP', spkrID, f"{basename}.wav")
            lab_path = os.path.join(raw_data_folder, 'MSP', spkrID, f"{basename}.lab")
            
            # 删除 .wav 文件
            if os.path.exists(wav_path):
                os.remove(wav_path)
                print(f"Deleted WAV: {wav_path}")
            
            # 删除 .lab 文件
            if os.path.exists(lab_path):
                os.remove(lab_path)
                print(f"Deleted LAB: {lab_path}")
            
            n += 1
print(n)


Error processing TextGrid: /home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/TextGrid/3258/MSP-PODCAST_5730_1301.TextGrid
Deleted TextGrid: /home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/TextGrid/3258/MSP-PODCAST_5730_1301.TextGrid
Deleted WAV: /home/you/workspace/son/FastSpeech2/raw_data/MSP/3258/MSP-PODCAST_5730_1301.wav
Deleted LAB: /home/you/workspace/son/FastSpeech2/raw_data/MSP/3258/MSP-PODCAST_5730_1301.lab
Error processing TextGrid: /home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/TextGrid/2430/MSP-PODCAST_5748_0066.TextGrid
Deleted TextGrid: /home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/TextGrid/2430/MSP-PODCAST_5748_0066.TextGrid
Deleted WAV: /home/you/workspace/son/FastSpeech2/raw_data/MSP/2430/MSP-PODCAST_5748_0066.wav
Deleted LAB: /home/you/workspace/son/FastSpeech2/raw_data/MSP/2430/MSP-PODCAST_5748_0066.lab
Error processing TextGrid: /home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/TextGrid/3234/MSP-PODCAST_5718_0589.TextG

In [4]:
'''获取最长序列长度'''
import os
import librosa

def get_max_seqlength(audio_folder, sampling_rate=22050):
    max_length = 0  # 最大序列长度
    max_file = None  # 最大文件名

    for root, dirs, files in os.walk(audio_folder):  # 遍历所有目录和次级目录
        for file in files:
            if file.endswith(".wav"):  # 只处理 .wav 文件
                file_path = os.path.join(root, file)  # 获取完整路径
                try:
                    # 加载音频文件
                    wav, sr = librosa.load(file_path, sr=sampling_rate)
                    length = len(wav)  # 获取采样点数
                    if length > max_length:
                        max_length = length
                        max_file = file_path
                except Exception as e:
                    print(f"Error processing {file_path}: {e}")

    # 转换为秒（可选）
    max_length_in_seconds = max_length / sampling_rate

    print(f"Max sequence length: {max_length} samples ({max_length_in_seconds:.2f} seconds)")
    print(f"Longest file: {max_file}")

# 使用示例
audio_folder = "/home/you/workspace/son/FastSpeech2/raw_data/MSP"  # 顶级目录
get_max_seqlength(audio_folder, sampling_rate=22050)


Max sequence length: 247042 samples (11.20 seconds)
Longest file: /home/you/workspace/son/FastSpeech2/raw_data/MSP/67/MSP-PODCAST_0285_0018.wav


In [5]:
'''删除非ARPAbet音素的textgrid文件'''
import os
import re
import tgt

# ARPAbet音素的正则表达式
ARPABET_PATTERN = re.compile(r"^[A-Z]+[0-2]?$")  # 大写字母+可选的重音数字（0, 1, 2）

def is_arpabet_sequence(text):
    """
    判断给定文本是否是有效的 ARPAbet 音素
    """
    return ARPABET_PATTERN.match(text) is not None

def delete_non_arpabet_textgrids(textgrid_folder):
    """
    删除 `phones` 层级中包含非 ARPAbet 音素序列的 TextGrid 文件
    """
    for root, dirs, files in os.walk(textgrid_folder):
        num = 0
        for file in files:
            if file.endswith(".TextGrid"):
                tg_path = os.path.join(root, file)
                try:
                    # 加载 TextGrid 文件
                    textgrid = tgt.io.read_textgrid(tg_path)
                    
                    # 检查是否存在 "phones" 层级
                    if "phones" in textgrid.get_tier_names():
                        tier = textgrid.get_tier_by_name("phones")
                        # phones = get_alignment(tier)
                        # print(phones)
                        # 遍历所有间隔，检查是否存在非 ARPAbet 音素
                        contains_non_arpabet = any(
                            not is_arpabet_sequence(interval.text.strip()) for interval in tier.intervals if interval.text.strip()
                        )
                        
                        if contains_non_arpabet:
                            # print(f"Deleting file with non-ARPAbet phonemes: {tg_path}")
                            os.remove(tg_path)
                            # 提取 basename 和 spkrID
                            basename = os.path.splitext(file)[0]
                            spkrID = os.path.basename(root)  # 获取上级目录名作为 spkrID
                            
                            # 构造对应的 .wav 和 .lab 文件路径
                            wav_path = os.path.join(raw_data_folder, 'MSP', spkrID, f"{basename}.wav")
                            lab_path = os.path.join(raw_data_folder, 'MSP', spkrID, f"{basename}.lab")
                            
                            # 删除 .wav 文件
                            if os.path.exists(wav_path):
                                os.remove(wav_path)
                                # print(f"Deleted WAV: {wav_path}")
                            
                            # 删除 .lab 文件
                            if os.path.exists(lab_path):
                                os.remove(lab_path)
                                # print(f"Deleted LAB: {lab_path}")
                            num += 1
                except Exception as e:
                    print(f"Error processing {tg_path}: {e}")
    
    print(f"Deleted {num} non-ARPAbet TextGrid files.")


def get_alignment(tier):
    sil_phones = ["sil", "sp", "spn"]

    phones = []
    for t in tier._objects:
        s, e, p = t.start_time, t.end_time, t.text

        # Trim leading silences
        if phones == []:
            if p in sil_phones:
                continue
            else:
                start_time = s

        if p not in sil_phones:
            # For ordinary phones
            phones.append(p)
            end_time = e
            end_idx = len(phones)
        else:
            # For silent phones
            phones.append(p)

    # Trim tailing silences
    phones = phones[:end_idx]

    return phones

# 使用示例
textgrid_folder = "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/TextGrid/"
delete_non_arpabet_textgrids(textgrid_folder)


Deleted 0 non-ARPAbet TextGrid files.


In [5]:
'''生成train_labels.json'''
import os
import pandas as pd
import json

# ======== 修改为你的实际路径 ========
audio_root = "/home/you/workspace/son/FastSpeech2/raw_data/MSP"         # wav 所在目录（包含子目录）
csv_path = "/home/you/workspace/database/MSP/Labels/labels_consensus.csv"             # 图1对应的CSV路径
json_out_path = "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/train_labels.json"          # 输出json文件名
# ====================================

# 1. 获取所有 .wav 文件名（不带扩展名）
wav_basenames = []
for root, _, files in os.walk(audio_root):
    for file in files:
        if file.endswith(".wav"):
            wav_basenames.append(file)

# 2. 加载 CSV，按 FileName 筛选
df = pd.read_csv(csv_path)

# 3. 筛选 CSV 中出现在 .wav 文件中的行
filtered_df = df[df["FileName"].isin(wav_basenames)]

# 4. 转为字典形式，以 FileName 为键
result_dict = filtered_df.set_index("FileName").T.to_dict()

# 5. 保存为 JSON
with open(json_out_path, "w", encoding="utf-8") as f:
    json.dump(result_dict, f, indent=4, ensure_ascii=False)

print(f"完成！共筛选出 {len(result_dict)} 条记录，已保存到：{json_out_path}")


完成！共筛选出 54786 条记录，已保存到：/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/train_labels.json


In [None]:
'''从train_labels.json提取emotions.json '''
import json
import os
import numpy as np

def generate_emotions_json(train_labels_path, output_path):
    # 读取train_labels.json
    labels_path = train_labels_path
    with open(labels_path, 'r', encoding='utf-8') as f:
        labels = json.load(f)
    
    # 初始化情绪字典
    emotions = set()
    arousals = []
    valences = []
    
    # 从标签中提取情绪信息
    for _, label in labels.items():
        # 情绪类型
        if 'EmoClass' in label:
            emotions.add(label['EmoClass'])
        # 唤醒度 (EmoAct)
        if 'EmoAct' in label:
            arousals.append(float(label['EmoAct']))
        # 效价 (EmoVal)
        if 'EmoVal' in label:
            valences.append(float(label['EmoVal']))
    
    # 创建情绪类型映射字典
    emotion_dict = {emotion: idx for idx, emotion in enumerate(sorted(emotions))}
    
    # 对唤醒度和效价进行分箱处理
    n_bins = 14  # 与模型配置保持一致
    arousal_min, arousal_max = min(arousals), max(arousals)
    valence_min, valence_max = min(valences), max(valences)
    
    # 创建分箱边界
    arousal_bins = np.linspace(arousal_min, arousal_max, n_bins-1)
    valence_bins = np.linspace(valence_min, valence_max, n_bins-1)
    
    # 创建映射字典
    arousal_dict = {str(round(bin_val, 1)): idx for idx, bin_val in enumerate(arousal_bins)}
    valence_dict = {str(round(bin_val, 1)): idx for idx, bin_val in enumerate(valence_bins)}
    
    # 组合成最终的emotions.json结构
    emotions_json = {
        "emotion_dict": emotion_dict,
        "arousal_dict": arousal_dict,
        "valence_dict": valence_dict,
        "arousal_bins": arousal_bins.tolist(),  # 保存分箱边界，供后续使用
        "valence_bins": valence_bins.tolist()   # 保存分箱边界，供后续使用
    }
    
    # 保存到emotions.json
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(emotions_json, f, indent=2)
    
    print(f"已生成 emotions.json，包含：")
    print(f"- 情绪类型数量: {len(emotion_dict)}")
    print(f"- 唤醒度分箱数量: {len(arousal_dict)}")
    print(f"- 效价分箱数量: {len(valence_dict)}")
    print(f"\n唤醒度范围: [{arousal_min:.1f}, {arousal_max:.1f}]")
    print(f"效价范围: [{valence_min:.1f}, {valence_max:.1f}]")

if __name__ == "__main__":
    train_labels_path = '/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/train_labels.json'
    output_path = '/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/emotions.json'
    generate_emotions_json(train_labels_path, output_path) 

In [2]:
'''生成添加了情绪信息的train/val.txt，使用前改原文件名为train_orignal.txt/val_orignal.txt'''
import json

def round_half(x):
    return round(x * 2) / 2

# ======= 修改路径为你的真实路径 =======
train_orignal_path = "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/train_original.txt"
val_orignal_path = "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/val_original.txt"
train_labels_path = "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/train_labels.json"
output_train_path = "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/train.txt"
output_val_path = "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/val.txt"
# =====================================

# 读取 JSON 数据（文件 B）
with open(train_labels_path, "r", encoding="utf-8") as f:
    emotion_data = json.load(f)
def process_emotion_file(input_path, output_path, emotion_data):
    """处理情绪标注文件
    
    Args:
        input_path (str): 输入文件路径(train_orignal.txt或val_orignal.txt)
        output_path (str): 输出文件路径(train.txt或val.txt) 
        emotion_data (dict): 情绪标注数据
    """
    with open(input_path, "r", encoding="utf-8") as fin, open(output_path, "w", encoding="utf-8") as fout:
        for line in fin:
            line = line.strip()
            if not line:
                continue
            parts = line.split("|")
            if len(parts) < 3:
                continue

            utt_id = parts[0]
            wav_key = utt_id + ".wav"

            if wav_key in emotion_data:
                emo = emotion_data[wav_key]
                emo_class = emo.get("EmoClass", "NA")
                emo_act = emo.get("EmoAct", "NA")
                emo_val = emo.get("EmoVal", "NA")
                emo_act = round_half(emo.get("EmoAct", 0))
                emo_val = round_half(emo.get("EmoVal", 0))
                # 添加到行尾
                new_line = f"{line}|{emo_class}|{emo_act}|{emo_val}\n"
            else:
                # 如果不在 JSON 中，也保留原始行
                new_line = f"{line}|NA|NA|NA\n"
                print(f'{line}')

            fout.write(new_line)
    
    print(f"✅ 合并完成，输出保存至：{output_path}")

# process_emotion_file(train_orignal_path, output_train_path, emotion_data)
process_emotion_file(val_orignal_path, output_val_path, emotion_data)


✅ 合并完成，输出保存至：/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/val.txt


In [2]:
'''合并LibriTTS和MSP的speakers.json'''
import json
import shutil
shutil.move("/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/speakers.json", "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/original_speakers.json")

with open("/home/you/workspace/son/FastSpeech2/preprocessed_data/LibriTTS/speakers.json") as f:
    libri_spk = json.load(f)
with open("/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/original_speakers.json") as f:
    msp_spk = json.load(f)

# 添加前缀，重新编号
merged_spk = {}
cur_id = 0

for name in libri_spk:
    merged_spk[f"LibriTTS_{name}"] = cur_id
    cur_id += 1

for name in msp_spk:
    merged_spk[f"MSP_{name}"] = cur_id
    cur_id += 1

with open("/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/speakers.json", "w") as f:
    json.dump(merged_spk, f, indent=4)


In [5]:
'''为speaker id添加前缀'''
import os

def add_prefix_to_speaker(line, dataset_name):
    parts = line.strip().split('|')
    if len(parts) >= 2:
        speaker_id = parts[1]
        # 根据数据集名称添加对应前缀
        parts[1] = f"{dataset_name}_{speaker_id}"
    return '|'.join(parts)

def process_file(file_path):
    temp_path = file_path + '.temp'
    # 从文件路径判断数据集名称
    dataset_name = os.path.basename(os.path.dirname(file_path))
    
    with open(file_path, 'r', encoding='utf-8') as fin:
        with open(temp_path, 'w', encoding='utf-8') as fout:
            for line in fin:
                new_line = add_prefix_to_speaker(line, dataset_name)
                fout.write(new_line + '\n')
    
    # 替换原文件
    os.replace(temp_path, file_path)
    print(f"✅ 已完成{os.path.basename(file_path)}中{dataset_name}数据集的speaker id前缀添加")

# 要处理的文件列表
files_to_process = [
    "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/train.txt",
    "/home/you/workspace/son/FastSpeech2/preprocessed_data/MSP/val.txt",
    "/home/you/workspace/son/FastSpeech2/preprocessed_data/LibriTTS/train.txt",
    "/home/you/workspace/son/FastSpeech2/preprocessed_data/LibriTTS/val.txt"
]

# 批量处理所有文件
for file_path in files_to_process:
    if os.path.exists(file_path):
        process_file(file_path)
    else:
        print(f"⚠️ 文件不存在: {file_path}")




✅ 已完成val.txt中MSP数据集的speaker id前缀添加
✅ 已完成train.txt中LibriTTS数据集的speaker id前缀添加
✅ 已完成val.txt中LibriTTS数据集的speaker id前缀添加
