In [1]:
import os
import requests
import tarfile
from tqdm import tqdm
import json
import shutil

In [None]:

save_path = "nsynth_data"
os.makedirs(save_path, exist_ok=True)

datasets = {
    "train": "http://download.magenta.tensorflow.org/datasets/nsynth/nsynth-train.jsonwav.tar.gz",
    "valid": "http://download.magenta.tensorflow.org/datasets/nsynth/nsynth-valid.jsonwav.tar.gz",
    "test":  "http://download.magenta.tensorflow.org/datasets/nsynth/nsynth-test.jsonwav.tar.gz"
}


def download_file(url, filename):
    if os.path.exists(filename):
        print(f"✅ 已存在：{filename}")
        return
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    block_size = 1024
    t = tqdm(total=total_size, unit='iB', unit_scale=True, desc=os.path.basename(filename))
    with open(filename, 'wb') as f:
        for data in response.iter_content(block_size):
            t.update(len(data))
            f.write(data)
    t.close()


def extract_tar_gz(file_path, extract_to):
    print(f"📦 正在解压：{os.path.basename(file_path)}")
    with tarfile.open(file_path, "r:gz") as tar:
        tar.extractall(path=extract_to)
    print("✅ 解压完成！")

for name, url in datasets.items():
    print(f"\n⏬ 正在处理：{name.upper()} 数据集")
    file_name = os.path.join(save_path, os.path.basename(url))
    download_file(url, file_name)
    extract_tar_gz(file_name, save_path)


⏬ 正在处理：TRAIN 数据集
✅ 已存在：nsynth_data/nsynth-train.jsonwav.tar.gz
📦 正在解压：nsynth-train.jsonwav.tar.gz


KeyboardInterrupt: 

In [None]:
nsynth_root = "nsynth_data"
splits = {
    "train": "nsynth-train",
    "valid": "nsynth-valid",
    "test":  "nsynth-test"
}

# 输出目录
output_dir = os.path.join(nsynth_root, "categorized_by_instrument_split")
os.makedirs(output_dir, exist_ok=True)

# 遍历 train / valid / test
for split_name, split_folder in splits.items():
    print(f"\n📂 正在处理：{split_name.upper()}")

    json_path = os.path.join(nsynth_root, split_folder, "examples.json")
    audio_path = os.path.join(nsynth_root, split_folder, "audio")

    # 读取元数据
    with open(json_path, "r") as f:
        metadata = json.load(f)

    # 遍历每个样本
    for key, item in tqdm(metadata.items()):
        instrument_family = item["instrument_family_str"]
        src_file = os.path.join(audio_path, f"{key}.wav")

        # 目标目录：<output>/<instrument>/<split>/
        target_dir = os.path.join(output_dir, instrument_family, split_name)
        os.makedirs(target_dir, exist_ok=True)

        dst_file = os.path.join(target_dir, f"{key}.wav")

        # 拷贝
        if os.path.exists(src_file):
            shutil.copy(src_file, dst_file)

print("\n✅ 所有音频已按“乐器种类 + 子集类型”完成分类。输出路径：", output_dir)


📂 正在处理：TRAIN


100%|██████████| 289205/289205 [02:04<00:00, 2322.34it/s]



📂 正在处理：VALID


100%|██████████| 12678/12678 [00:04<00:00, 3065.29it/s]



📂 正在处理：TEST


100%|██████████| 4096/4096 [00:01<00:00, 3038.56it/s]


✅ 所有音频已按“乐器种类 + 子集类型”完成分类。输出路径： nsynth_data/categorized_by_instrument_split





In [None]:
nsynth_root = "nsynth_data"
splits = {
    "train": "nsynth-train",
    "valid": "nsynth-valid",
    "test": "nsynth-test"
}

# 目标乐器 + 目标来源类型
target_instruments = ["guitar", "keyboard", "reed", "string"]
target_source = "acoustic"

# 输出目录
output_dir = os.path.join(nsynth_root, "categorized_acoustic2")
os.makedirs(output_dir, exist_ok=True)

for split_name, split_folder in splits.items():
    print(f"\n📂 正在处理数据集：{split_name.upper()}")
    
    json_path = os.path.join(nsynth_root, split_folder, "examples.json")
    audio_path = os.path.join(nsynth_root, split_folder, "audio")

    # 加载元数据
    with open(json_path, "r") as f:
        metadata = json.load(f)

    count = 0
    for key, item in tqdm(metadata.items()):
        inst = item["instrument_family_str"]
        source = item["instrument_source_str"]
        
        if inst in target_instruments and source == target_source:
            src = os.path.join(audio_path, f"{key}.wav")
            dst_dir = os.path.join(output_dir, inst, split_name)
            os.makedirs(dst_dir, exist_ok=True)
            dst = os.path.join(dst_dir, f"{key}.wav")

            if os.path.exists(src):
                shutil.copy(src, dst)
                count += 1

    print(f"✅ 完成 {split_name}：共复制 {count} 个 {target_source} 音频样本")

print("\n🎉 所有 acoustic 类型音频分类完成，保存路径：", output_dir)


📂 正在处理数据集：TRAIN


100%|██████████| 289205/289205 [00:19<00:00, 14668.19it/s]


✅ 完成 train：共复制 52145 个 acoustic 音频样本

📂 正在处理数据集：VALID


100%|██████████| 12678/12678 [00:01<00:00, 10988.43it/s]


✅ 完成 valid：共复制 3337 个 acoustic 音频样本

📂 正在处理数据集：TEST


100%|██████████| 4096/4096 [00:00<00:00, 10967.92it/s]

✅ 完成 test：共复制 1141 个 acoustic 音频样本

🎉 所有 acoustic 类型音频分类完成，保存路径： nsynth_data/categorized_acoustic2





In [None]:
import random
nsynth_root = "nsynth_data"
splits = {
    "train": "nsynth-train",
    "valid": "nsynth-valid",
    "test":  "nsynth-test"
}

# 已选择过的乐器类型
excluded_instruments = ["guitar", "keyboard", "reed", "string"]

# 每个子集想要随机采样的样本数
samples_per_split = {
    "train": 500,
    "valid": 200,
    "test": 200
}

# ID 映射
instrument_family_map = {
    0: "bass", 1: "brass", 2: "flute", 3: "guitar", 4: "keyboard",
    5: "mallet", 6: "organ", 7: "reed", 8: "string", 9: "synth_lead", 10: "vocal"
}
source_map = {0: "acoustic", 1: "electronic", 2: "synthetic"}

# 输出目录
output_dir = os.path.join(nsynth_root, "extra_acoustic_random_flat")
os.makedirs(output_dir, exist_ok=True)

for split_name, split_folder in splits.items():
    print(f"\n🎯 正在处理 {split_name.upper()}")

    json_path = os.path.join(nsynth_root, split_folder, "examples.json")
    audio_path = os.path.join(nsynth_root, split_folder, "audio")

    # 加载元数据
    with open(json_path, "r") as f:
        metadata = json.load(f)

    # 收集所有非目标乐器中的 acoustic 音频
    candidates = []
    for key, item in metadata.items():
        inst = instrument_family_map[item["instrument_family"]]
        source = source_map[item["instrument_source"]]
        if inst not in excluded_instruments and source == "acoustic":
            candidates.append(key)

    print(f"🔍 可用样本数量：{len(candidates)}")

    # 随机选取
    num_to_sample = min(samples_per_split[split_name], len(candidates))
    selected = random.sample(candidates, num_to_sample)
    print(f"✅ 选取数量：{num_to_sample}")

    # 拷贝文件到目标文件夹（只按 split 分类）
    split_out_dir = os.path.join(output_dir, split_name)
    os.makedirs(split_out_dir, exist_ok=True)

    for key in tqdm(selected):
        src = os.path.join(audio_path, key + ".wav")
        dst = os.path.join(split_out_dir, key + ".wav")
        shutil.copy(src, dst)

print("\n🎉 全部完成！所有文件按 train/valid/test 分类保存。")


🎯 正在处理 TRAIN
🔍 可用样本数量：50020
✅ 选取数量：500


100%|██████████| 500/500 [00:00<00:00, 2625.65it/s]


🎯 正在处理 VALID





🔍 可用样本数量：1787
✅ 选取数量：200


100%|██████████| 200/200 [00:00<00:00, 2511.58it/s]



🎯 正在处理 TEST
🔍 可用样本数量：548
✅ 选取数量：200


100%|██████████| 200/200 [00:00<00:00, 3589.37it/s]


🎉 全部完成！所有文件按 train/valid/test 分类保存。





In [None]:
try:
    from pydub import AudioSegment
    has_pydub = True
except ImportError:
    has_pydub = False

try:
    import soundfile as sf
    from scipy.io import wavfile
    has_sf = True
except ImportError:
    has_sf = False

# 目标乐器：instrument_family 映射
selected_families = {
    0: 'bass',
    3: 'guitar',
    8: 'string',
}

# 子集路径（你需事先解压好）
subsets = {
    'train': './nsynth_data/nsynth-train',
    'valid': './nsynth_data/nsynth-valid',
    'test': './nsynth_data/nsynth-test',
}

# 输出目录
output_root = 'nsynth-split'
os.makedirs(output_root, exist_ok=True)

# 裁剪参数
clip_seconds = 2

def crop_wav_pydub(in_path, out_path, seconds=2):
    try:
        sound = AudioSegment.from_wav(in_path)
        trimmed = sound[:seconds * 1000]
        trimmed.export(out_path, format='wav')
        return True
    except Exception as e:
        print(f"[pydub] 裁剪失败: {in_path} → {e}")
        return False

def crop_wav_sf(in_path, out_path, seconds=2):
    try:
        data, samplerate = sf.read(in_path)
        clip_len = int(seconds * samplerate)
        trimmed = data[:clip_len]
        sf.write(out_path, trimmed, samplerate)
        return True
    except Exception as e:
        print(f"[sf] 裁剪失败: {in_path} → {e}")
        return False

def crop_wav(in_path, out_path, seconds=2):
    if has_pydub:
        success = crop_wav_pydub(in_path, out_path, seconds)
        if success:
            return True
    if has_sf:
        return crop_wav_sf(in_path, out_path, seconds)
    else:
        print("❌ 没有可用的裁剪库（pydub 或 soundfile）")
        return False

# 主处理逻辑
total = 0
for subset, path in subsets.items():
    json_path = os.path.join(path, 'examples.json')
    audio_dir = os.path.join(path, 'audio')

    if not os.path.exists(json_path):
        print(f"⚠️ 缺少 metadata 文件: {json_path}")
        continue

    with open(json_path, 'r') as f:
        metadata = json.load(f)

    print(f"📂 正在处理 {subset} 子集...")
    for filename, info in metadata.items():
        family = info['instrument_family']
        if family in selected_families:
            label = selected_families[family]
            src = os.path.join(audio_dir, filename + '.wav')
            dst_dir = os.path.join(output_root, subset, label)
            dst = os.path.join(dst_dir, filename + '.wav')
            os.makedirs(dst_dir, exist_ok=True)

            if os.path.exists(src):
                if crop_wav(src, dst, seconds=clip_seconds):
                    total += 1
            else:
                print(f"⚠️ 音频文件不存在: {src}")

print(f"\n✅ 完成！共处理并裁剪 {total} 个音频文件，保存在 '{output_root}' 文件夹中。")

📂 正在处理 train 子集...
📂 正在处理 valid 子集...
📂 正在处理 test 子集...

✅ 完成！共处理并裁剪 124972 个音频文件，保存在 'nsynth-split' 文件夹中。


In [None]:
from pydub import AudioSegment
# 修改为你实际的 nsynth 数据目录路径
base_dir = "./nsynth_data"  # 这里指的是包含 nsynth-train、nsynth-valid、nsynth-test 的目录
output_dir = "./output"

target_instruments = {
    "guitar": "acoustic",
    "string": "acoustic",
    "bass": "electronic"
}

# 数据集的子集对应关系
splits = {
    "train": "nsynth-train",
    "valid": "nsynth-valid",
    "test": "nsynth-test"
}

# # 目标裁剪时长（毫秒）
clip_duration_ms = 1500

def ensure_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

for split, folder in splits.items():
    print(f"🔍 正在处理 {split} 数据...")

    json_path = os.path.join(base_dir, folder, "examples.json")
    audio_path = os.path.join(base_dir, folder, "audio")

    with open(json_path, "r") as f:
        metadata = json.load(f)

    for key, meta in metadata.items():
        family = meta["instrument_family_str"]
        source = meta["instrument_source_str"]

        # 判断是否符合目标筛选条件
        if family in target_instruments and source == target_instruments[family]:
            src_audio = os.path.join(audio_path, f"{key}.wav")
            dst_dir = os.path.join(output_dir, family, split)
            ensure_dir(dst_dir)
            dst_audio = os.path.join(dst_dir, f"{key}.wav")

            try:
                audio = AudioSegment.from_wav(src_audio)
                audio[:clip_duration_ms].export(dst_audio, format="wav")
            except Exception as e:
                print(f"⚠️ 无法处理 {key}: {e}")

print("✅ 所有音频提取与裁剪完成！")

🔍 正在处理 train 数据...
🔍 正在处理 valid 数据...
🔍 正在处理 test 数据...
✅ 所有音频提取与裁剪完成！


In [None]:
import random
base_dir = "./nsynth_data"  # 这里指的是包含 nsynth-train、nsynth-valid、nsynth-test 的目录
output_dir = "./output"

excluded_instruments = {"guitar", "bass", "string"}
target_source = "acoustic"
clip_duration_ms = 1500

def ensure_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def extract_all_other_acoustic(split_name, folder_name):
    print(f"\n🎧 正在提取 {split_name} 中所有非 guitar/bass/string 且为 acoustic 的音频...")

    json_path = os.path.join(base_dir, folder_name, "examples.json")
    audio_path = os.path.join(base_dir, folder_name, "audio")

    with open(json_path, "r") as f:
        metadata = json.load(f)

    count = 0
    for key, meta in metadata.items():
        family = meta["instrument_family_str"]
        source = meta["instrument_source_str"]

        if family not in excluded_instruments and source == target_source:
            src_audio = os.path.join(audio_path, f"{key}.wav")
            dst_dir = os.path.join(output_dir, "other", split_name)
            ensure_dir(dst_dir)
            dst_audio = os.path.join(dst_dir, f"{family}_{key}.wav")

            try:
                audio = AudioSegment.from_wav(src_audio)
                audio[:clip_duration_ms].export(dst_audio, format="wav")
                count += 1
            except Exception as e:
                print(f"⚠️ 无法处理 {key}: {e}")

    print(f"✅ 完成：共提取 {count} 条样本 -> {split_name}/")

# 执行提取 train 和 test
extract_all_other_acoustic("train", "nsynth-train")
extract_all_other_acoustic("test", "nsynth-test")

print("\n🚀 所有提取完成，结果保存在 /output/other/train 和 /output/other/test")


🎧 正在提取 train 中所有非 guitar/bass/string 且为 acoustic 的音频...
✅ 完成：共提取 0 条样本 -> train/

🎧 正在提取 test 中所有非 guitar/bass/string 且为 acoustic 的音频...
✅ 完成：共提取 0 条样本 -> test/

🚀 所有提取完成，结果保存在 /output/other/train 和 /output/other/test


In [10]:
import os
import json
import shutil

# NSynth 路径设置
base_dir = "./nsynth_data"       # 修改为你的 NSynth 路径
output_dir = "./output1"         # 输出路径

excluded_instruments = {"guitar", "bass", "string"}
target_source = "acoustic"

def ensure_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def extract_all_other_acoustic(split_name, folder_name):
    print(f"\n📁 正在提取 {split_name} 中所有 acoustic + 非 guitar/bass/string 的音频")

    json_path = os.path.join(base_dir, folder_name, "examples.json")
    audio_path = os.path.join(base_dir, folder_name, "audio")

    with open(json_path, "r") as f:
        metadata = json.load(f)

    count = 0
    for key, meta in metadata.items():
        family = meta["instrument_family_str"]
        source = meta["instrument_source_str"]

        if family not in excluded_instruments and source == target_source:
            src_audio = os.path.join(audio_path, f"{key}.wav")
            dst_dir = os.path.join(output_dir, "other", split_name, family)
            ensure_dir(dst_dir)
            dst_audio = os.path.join(dst_dir, f"{family}_{key}.wav")

            try:
                shutil.copy2(src_audio, dst_audio)
                count += 1
            except Exception as e:
                print(f"⚠️ 复制失败 {key}: {e}")

    print(f"✅ 提取完成：共复制 {count} 个音频到 {split_name}/")

# 执行提取 train 和 test
extract_all_other_acoustic("train", "nsynth-train")
extract_all_other_acoustic("test", "nsynth-test")

print("\n🎉 所有音频提取完成！保存在 /output1/other/")



📁 正在提取 train 中所有 acoustic + 非 guitar/bass/string 的音频
✅ 提取完成：共复制 71195 个音频到 train/

📁 正在提取 test 中所有 acoustic + 非 guitar/bass/string 的音频
✅ 提取完成：共复制 902 个音频到 test/

🎉 所有音频提取完成！保存在 /output1/other/


In [9]:
import os
import json
import shutil

# 修改为你实际的 nsynth 数据目录路径
base_dir = "./nsynth_data"  # 包含 nsynth-train、nsynth-valid、nsynth-test 的目录
output_dir = "./output1"

# 筛选目标乐器与其来源类型
target_instruments = {
    "guitar": "acoustic",
    "string": "acoustic",
    "bass": "electronic"
}

# 数据集对应关系
splits = {
    "train": "nsynth-train",
    "valid": "nsynth-valid",
    "test": "nsynth-test"
}

def ensure_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

for split, folder in splits.items():
    print(f"🔍 正在处理 {split} 数据...")

    json_path = os.path.join(base_dir, folder, "examples.json")
    audio_path = os.path.join(base_dir, folder, "audio")

    with open(json_path, "r") as f:
        metadata = json.load(f)

    for key, meta in metadata.items():
        family = meta["instrument_family_str"]
        source = meta["instrument_source_str"]

        # 判断是否符合筛选条件
        if family in target_instruments and source == target_instruments[family]:
            src_audio = os.path.join(audio_path, f"{key}.wav")
            dst_dir = os.path.join(output_dir, family, split)
            ensure_dir(dst_dir)
            dst_audio = os.path.join(dst_dir, f"{key}.wav")

            try:
                shutil.copy2(src_audio, dst_audio)
            except Exception as e:
                print(f"⚠️ 无法复制 {key}: {e}")

print("✅ 所有音频提取完成（未裁剪）！")

🔍 正在处理 train 数据...
🔍 正在处理 valid 数据...
🔍 正在处理 test 数据...
✅ 所有音频提取完成（未裁剪）！
