# Raw Audios -> wav Audios
Audio files ori -> Audio files

In [None]:
import os
import shutil
import re

# 设定原始和目标文件夹的完整路径
src_base_dir = "E:/AMR/DA/Projekt/data/Audio_files_ori"  # 原始文件夹
dst_base_dir = "E:/AMR/DA/Projekt/data/Audio_files"  # 目标文件夹

# 确保目标文件夹存在
os.makedirs(dst_base_dir, exist_ok=True)

# 正则表达式匹配 XC 编号格式，例如 "XC123456"
xc_pattern = re.compile(r"(XC\d+)")

# 遍历原始文件夹
for folder in os.listdir(src_base_dir):
    src_folder_path = os.path.join(src_base_dir, folder)
    dst_folder_path = os.path.join(dst_base_dir, folder)

    # 确保目标子文件夹存在
    os.makedirs(dst_folder_path, exist_ok=True)

    # 确保是文件夹
    if os.path.isdir(src_folder_path):
        for file in os.listdir(src_folder_path):
            if file.endswith((".mp3", ".ogg", ".wav")):  # 处理音频文件
                match = xc_pattern.search(file)  # 提取 XC 编号
                if match:
                    number = match.group(1)  # 只保留 "XC123456"
                    new_filename = f"{number}.wav"  # 统一改为 "XC123456.wav"

                    # 原始文件路径和目标文件路径
                    src_file_path = os.path.join(src_folder_path, file)
                    dst_file_path = os.path.join(dst_folder_path, new_filename)

                    # 复制并重命名
                    shutil.copy2(src_file_path, dst_file_path)
                    print(f"已处理: {src_file_path} -> {dst_file_path}")

print("所有文件已重命名并移动至目标文件夹！")


# Train meta csv Generator

In [1]:
import os
import pandas as pd

# 设置数据文件夹路径
base_dir = "E:/AMR/DA/Projekt/data/Audio_files"  # 这里可以改成你的实际路径

# 用于存储数据
data = []

# 遍历 Audio_files 文件夹
for folder in os.listdir(base_dir):
    folder_path = os.path.join(base_dir, folder)
    
    # 只处理文件夹
    if os.path.isdir(folder_path):
        # 解析文件夹名称，获取 vocalization 和 bird_name
        parts = folder.split(" - ")
        if len(parts) != 2:
            print(f"跳过文件夹：{folder}，命名格式不符合预期")
            continue
        
        vocalization, bird_name = parts
        
        # 遍历该类别下的所有音频文件
        for file in os.listdir(folder_path):
            if file.endswith(".wav"):  # 只处理 .wav 文件
                number = file.replace(".wav", "")  # 提取 XC 编号
                full_path = os.path.join(folder_path, file)  # 记录完整路径（绝对路径）
                full_path = full_path.replace("\\", "/")  # 统一路径分隔符
                data.append([bird_name, vocalization, number, full_path])

# 创建 DataFrame
df = pd.DataFrame(data, columns=["bird_name", "vocalization", "number", "path"])

# 保存 CSV
csv_path = "E:/AMR/DA/Projekt/data/Audio_files/train_meta_100.csv"
df.to_csv(csv_path, index=False, encoding="utf-8")
print(f"CSV 文件已保存至 {csv_path}")


跳过文件夹：audio_file_counts.csv，命名格式不符合预期
CSV 文件已保存至 E:/AMR/DA/Projekt/data/Audio_files/train_meta_100.csv


# Audio Split, Spectogram and all data meta.csv

In [None]:
import os
import numpy as np
import pandas as pd
import librosa
import cv2
import math
import cupy as cp
from cupyx.scipy import signal as cupy_signal
from tqdm import tqdm

# 配置参数
class config:
    SEED = 2024
    DEVICE = 'cpu'
    OUTPUT_DIR = "E:/AMR/DA/Projekt/data/Audio_spec"  # 存储频谱数据
    FS = 32000  # 采样率
    N_FFT = 1095  # FFT 点数
    WIN_SIZE = 412  # 频谱窗口大小
    WIN_LAP = 100  # 频谱窗口重叠大小
    MIN_FREQ = 40  # 最小频率
    MAX_FREQ = 15000  # 最大频率
    SEGMENT_DURATION = 3  # 每段 3 秒
    SPEC_SIZE = (256, 256)  # 频谱图大小 (宽, 高)

# 确保输出目录存在
os.makedirs(config.OUTPUT_DIR, exist_ok=True)

# 读取 train_meta.csv
csv_path = "E:/AMR/DA/Projekt/data/Audio_files/train_meta_100_deduplicated.csv"
train_df = pd.read_csv(csv_path)

# 频谱转换函数
def oog2spec_via_cupy(audio_data):
    audio_data = cp.array(audio_data)
    
    # 处理 NaN 数据
    mean_signal = cp.nanmean(audio_data)
    audio_data = cp.nan_to_num(audio_data, nan=mean_signal) if cp.isnan(audio_data).mean() < 1 else cp.zeros_like(audio_data)
    
    # 计算频谱
    frequencies, times, spec_data = cupy_signal.spectrogram(
        audio_data, 
        fs=config.FS, 
        nfft=config.N_FFT, 
        nperseg=config.WIN_SIZE, 
        noverlap=config.WIN_LAP, 
        window='hann'
    )

    # 过滤频率范围
    valid_freq = (frequencies >= config.MIN_FREQ) & (frequencies <= config.MAX_FREQ)
    spec_data = spec_data[valid_freq, :]
    
    # 对数变换和归一化
    spec_data = cp.log10(spec_data + 1e-20)
    spec_data = spec_data - spec_data.min()
    spec_data = spec_data / spec_data.max()
    
    return spec_data.get()

# 用于存储处理后的数据
all_data = []

# 处理音频数据
for i, row in tqdm(train_df.iterrows(), total=len(train_df)):
    file_path = row["path"]
    bird_name = row["bird_name"]
    vocalization = row["vocalization"]
    number = row["number"]

    # 读取音频
    try:
        audio_data, _ = librosa.load(file_path, sr=config.FS)
    except Exception as e:
        print(f"加载失败: {file_path}, 错误: {e}")
        continue

    # 计算音频总时长
    total_duration = len(audio_data) / config.FS

    # 按 3s 分割音频
    num_segments = math.floor(total_duration / config.SEGMENT_DURATION)

    for seg_idx in range(num_segments):
        start_idx = seg_idx * config.SEGMENT_DURATION * config.FS
        end_idx = start_idx + config.SEGMENT_DURATION * config.FS
        segment_audio = audio_data[start_idx:end_idx]

        # 转换为频谱图
        spec_data = oog2spec_via_cupy(segment_audio)

        # 调整尺寸
        spec_data = cv2.resize(spec_data, config.SPEC_SIZE, interpolation=cv2.INTER_AREA)

        # 保存频谱数据
        spec_filename = f"{number}_seg{seg_idx}.npy"
        spec_filepath = os.path.join(config.OUTPUT_DIR, spec_filename)
        np.save(spec_filepath, spec_data.astype(np.float32))

        # 记录数据
        all_data.append([bird_name, vocalization, number, seg_idx, spec_filepath])

# 生成 all_data_meta.csv
meta_df = pd.DataFrame(all_data, columns=["bird_name", "vocalization", "number", "segment_index", "path"])
meta_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta.csv"
meta_df.to_csv(meta_csv_path, index=False, encoding="utf-8")

print(f"所有音频处理完成，频谱图存储在 {config.OUTPUT_DIR}")
print(f"元数据 CSV 文件已保存至 {meta_csv_path}")


# 新版生成spec和mel，512 256的

In [2]:
import os
import numpy as np
import pandas as pd
import librosa
import cv2
import math
import cupy as cp
from cupyx.scipy import signal as cupy_signal
from tqdm import tqdm
# 配置参数
# class config:
#     SEED = 2024
#     DEVICE = 'cpu'
#     OUTPUT_DIR = "E:/AMR/DA/Projekt/data/Audio_spec"  # 存储频谱数据
#     FS = 32000  # 采样率
#     N_FFT = 1095  # FFT 点数
#     WIN_SIZE = 412  # 频谱窗口大小
#     WIN_LAP = 100  # 频谱窗口重叠大小
#     MIN_FREQ = 40  # 最小频率
#     MAX_FREQ = 15000  # 最大频率
#     SEGMENT_DURATION = 3  # 每段 3 秒
#     SPEC_SIZE = (256, 256)  # 频谱图大小 (宽, 高)
# 配置参数
class config:
    SEED = 2024
    DEVICE = 'cpu'
    OUTPUT_DIR_SPEC = "E:/AMR/DA/Projekt/data/Audio_spec_paperstyle"  # 线性频谱图目录
    OUTPUT_DIR_MEL = "E:/AMR/DA/Projekt/data/Audio_spec_mel"         # mel频谱图目录
    FS = 48000
    N_FFT = 512
    WIN_SIZE = 512
    WIN_LAP = 384
    MIN_FREQ = 150
    MAX_FREQ = 15000
    SEGMENT_DURATION = 3
    SPEC_SIZE = (512, 256)  # (宽, 高)

# 确保输出目录存在
os.makedirs(config.OUTPUT_DIR_SPEC, exist_ok=True)
os.makedirs(config.OUTPUT_DIR_MEL, exist_ok=True)

# 读取 train_meta.csv
csv_path = "E:/AMR/DA/Projekt/data/Audio_files/train_meta_100_deduplicated.csv"
train_df = pd.read_csv(csv_path)

# 线性频谱图（via cupy）
def oog2spec_via_cupy(audio_data):
    audio_data = cp.array(audio_data)
    mean_signal = cp.nanmean(audio_data)
    audio_data = cp.nan_to_num(audio_data, nan=mean_signal) if cp.isnan(audio_data).mean() < 1 else cp.zeros_like(audio_data)
    
    frequencies, times, spec_data = cupy_signal.spectrogram(
        audio_data,
        fs=config.FS,
        nfft=config.N_FFT,
        nperseg=config.WIN_SIZE,
        noverlap=config.WIN_LAP,
        window='hann'
    )
    valid_freq = (frequencies >= config.MIN_FREQ) & (frequencies <= config.MAX_FREQ)
    spec_data = spec_data[valid_freq, :]
    spec_data = cp.log10(spec_data + 1e-20)
    spec_data = spec_data - spec_data.min()
    spec_data = spec_data / spec_data.max()
    return spec_data.get()

# 新增：mel 频谱图
def audio2mel(audio_data):
    mel_spec = librosa.feature.melspectrogram(
        y=audio_data,
        sr=config.FS,
        n_fft=config.N_FFT,
        hop_length=config.N_FFT - config.WIN_LAP,
        win_length=config.WIN_SIZE,
        window='hann',
        n_mels=64,
        fmin=config.MIN_FREQ,
        fmax=config.MAX_FREQ
    )
    mel_spec = np.log10(mel_spec + 1e-9)
    mel_spec = mel_spec - mel_spec.min()
    mel_spec = mel_spec / mel_spec.max()
    return mel_spec

# 存储处理结果
all_data = []

# 主处理循环
for i, row in tqdm(train_df.iterrows(), total=len(train_df)):
    file_path = row["path"]
    bird_name = row["bird_name"]
    vocalization = row["vocalization"]
    number = row["number"]

    try:
        audio_data, _ = librosa.load(file_path, sr=config.FS)
    except Exception as e:
        print(f"加载失败: {file_path}, 错误: {e}")
        continue

    total_duration = len(audio_data) / config.FS
    num_segments = math.floor(total_duration / config.SEGMENT_DURATION)

    for seg_idx in range(num_segments):
        start_idx = seg_idx * config.SEGMENT_DURATION * config.FS
        end_idx = start_idx + config.SEGMENT_DURATION * config.FS
        segment_audio = audio_data[start_idx:end_idx]

        # ➤ 生成线性频谱图
        spec_data = oog2spec_via_cupy(segment_audio)
        spec_data = cv2.resize(spec_data, config.SPEC_SIZE, interpolation=cv2.INTER_AREA)
        spec_filename = f"{number}_seg{seg_idx}.npy"
        spec_filepath = os.path.join(config.OUTPUT_DIR_SPEC, spec_filename)
        np.save(spec_filepath, spec_data.astype(np.float32))

        # ➤ 生成 mel 频谱图
        mel_data = audio2mel(segment_audio)
        mel_data = cv2.resize(mel_data, config.SPEC_SIZE, interpolation=cv2.INTER_AREA)
        mel_filename = f"{number}_seg{seg_idx}_mel.npy"
        mel_filepath = os.path.join(config.OUTPUT_DIR_MEL, mel_filename)
        np.save(mel_filepath, mel_data.astype(np.float32))

        # ➤ 记录路径信息
        all_data.append([bird_name, vocalization, number, seg_idx, spec_filepath, mel_filepath])

# 保存元数据
meta_df = pd.DataFrame(all_data, columns=[
    "bird_name", "vocalization", "number", "segment_index", "spec_path", "mel_path"
])
meta_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta.csv"
meta_df.to_csv(meta_csv_path, index=False, encoding="utf-8")

print(f"✅ 所有音频处理完成！")
print(f"📁 线性频谱图存储于: {config.OUTPUT_DIR_SPEC}")
print(f"📁 Mel 频谱图存储于: {config.OUTPUT_DIR_MEL}")
print(f"📝 元数据 CSV 文件已保存至: {meta_csv_path}")


  cupy._util.experimental('cupyx.jit.rawkernel')
  audio_data, _ = librosa.load(file_path, sr=config.FS)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  mel_spec = mel_spec / mel_spec.max()
100%|██████████| 7844/7844 [48:57<00:00,  2.67it/s]  


✅ 所有音频处理完成！
📁 线性频谱图存储于: E:/AMR/DA/Projekt/data/Audio_spec_paperstyle
📁 Mel 频谱图存储于: E:/AMR/DA/Projekt/data/Audio_spec_mel
📝 元数据 CSV 文件已保存至: E:/AMR/DA/Projekt/data/all_data_meta.csv


# 整合freefield

In [None]:
import os
import shutil

# 配置参数
source_dir = "E:/AMR/DA/Projekt/data/freefield1010"  # 原始目录
target_dir = "E:/AMR/DA/Projekt/data/negative_audio"  # 目标目录

# 确保目标目录存在
os.makedirs(target_dir, exist_ok=True)

# 遍历原始目录及其子目录
for root, _, files in os.walk(source_dir):
    for file in files:
        if file.endswith(".wav"):  # 只处理 .wav 文件
            source_file = os.path.join(root, file)  # 源文件路径
            target_file = os.path.join(target_dir, file)  # 目标文件路径

            # 如果目标文件夹中没有这个文件，则复制
            if not os.path.exists(target_file):
                shutil.copy2(source_file, target_file)  # 使用 copy2 保留文件的原始元数据
                print(f"复制 {source_file} 到 {target_file}")
            else:
                print(f"跳过 {file}，目标文件已存在。")

print("音频文件传输完成！")


# Add Negative Samples from freefield1010

In [None]:
import os
import numpy as np
import pandas as pd
import librosa
import cv2
import math
import cupy as cp
from cupyx.scipy import signal as cupy_signal
from tqdm import tqdm

# 配置参数
class config:
    SEED = 2024
    DEVICE = 'cpu'
    OUTPUT_DIR = "E:/AMR/DA/Projekt/data/Audio_spec"  # 存储频谱数据
    FS = 32000  # 采样率
    N_FFT = 1095  # FFT 点数
    WIN_SIZE = 412  # 频谱窗口大小
    WIN_LAP = 100  # 频谱窗口重叠大小
    MIN_FREQ = 40  # 最小频率
    MAX_FREQ = 15000  # 最大频率
    SEGMENT_DURATION = 3  # 每段 3 秒
    SPEC_SIZE = (256, 256)  # 频谱图大小 (宽, 高)
    MAX_NEGATIVE_SAMPLES = 3500  # 最大负样本数量

# 确保输出目录存在
os.makedirs(config.OUTPUT_DIR, exist_ok=True)

# 频谱转换函数
def oog2spec_via_cupy(audio_data):
    if len(audio_data) == 0:
        print("⚠️ 警告：音频数据为空，跳过该文件。")
        return None
    
    audio_data = cp.array(audio_data)
    
    # 计算频谱
    frequencies, times, spec_data = cupy_signal.spectrogram(
        audio_data, 
        fs=config.FS, 
        nfft=config.N_FFT, 
        nperseg=config.WIN_SIZE, 
        noverlap=config.WIN_LAP, 
        window='hann'
    )

    # 过滤频率范围
    valid_freq = (frequencies >= config.MIN_FREQ) & (frequencies <= config.MAX_FREQ)
    spec_data = spec_data[valid_freq, :]
    
    # 对数变换和归一化
    spec_data = cp.log10(spec_data + 1e-20)
    spec_data = spec_data - spec_data.min()
    spec_data = spec_data / spec_data.max()
    
    return spec_data.get()

# 处理背景噪音（negative samples）数据
negative_samples_dir = "E:/AMR/DA/Projekt/data/negative_audio"  # 背景噪音音频目录
negative_samples_processed = 0
negative_data = []  # 存储负样本信息

for root, _, files in os.walk(negative_samples_dir):
    for file in tqdm(files, desc="Processing Negative Audio Files"):
        if file.endswith(".wav"):
            file_path = os.path.join(root, file)

            try:
                # 加载音频
                audio_data, _ = librosa.load(file_path, sr=config.FS)

                if len(audio_data) == 0:
                    print(f"⚠️ 警告：文件 {file} 音频数据为空，跳过。")
                    continue
            except Exception as e:
                print(f"❌ 加载失败: {file}, 错误: {e}")
                continue

            # 计算音频总时长
            total_duration = len(audio_data) / config.FS

            # 计算完整的 3 秒片段数量
            num_segments = math.floor(total_duration / config.SEGMENT_DURATION)

            for seg_idx in range(num_segments + 1):  # +1 以考虑最后一个不足3秒的片段
                start_idx = seg_idx * config.SEGMENT_DURATION * config.FS
                end_idx = start_idx + config.SEGMENT_DURATION * config.FS
                segment_audio = audio_data[start_idx:end_idx]

                # 如果最后一个片段不足 3s，则进行 Padding（零填充）
                if len(segment_audio) < config.SEGMENT_DURATION * config.FS:
                    padding = config.SEGMENT_DURATION * config.FS - len(segment_audio)
                    segment_audio = np.pad(segment_audio, (0, padding), mode='constant', constant_values=0)

                # 生成频谱图
                spec_data = oog2spec_via_cupy(segment_audio)

                if spec_data is None:
                    print(f"⚠️ 频谱数据为空，跳过文件 {file} 的片段 {seg_idx}。")
                    continue  # 跳过该片段

                # 调整尺寸
                spec_data = cv2.resize(spec_data, config.SPEC_SIZE, interpolation=cv2.INTER_AREA)

                # 保存频谱数据
                spec_filename = f"negative_{file.split('.')[0]}_seg{seg_idx}.npy"
                spec_filepath = os.path.join(config.OUTPUT_DIR, spec_filename)
                np.save(spec_filepath, spec_data.astype(np.float32))

                # 记录数据
                negative_data.append(["Background Noise", "none", f"negative_{file.split('.')[0]}", seg_idx, spec_filepath])
                negative_samples_processed += 1

                # 达到最大数量后停止
                if negative_samples_processed >= config.MAX_NEGATIVE_SAMPLES:
                    print(f"🎯 已处理 {config.MAX_NEGATIVE_SAMPLES} 个负样本，停止处理。")
                    break

            if negative_samples_processed >= config.MAX_NEGATIVE_SAMPLES:
                break

    if negative_samples_processed >= config.MAX_NEGATIVE_SAMPLES:
        break

# 读取原始 all_data_meta.csv
meta_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta.csv"
if os.path.exists(meta_csv_path):
    existing_df = pd.read_csv(meta_csv_path)
    negative_df = pd.DataFrame(negative_data, columns=["bird_name", "vocalization", "number", "segment_index", "path"])
    combined_df = pd.concat([existing_df, negative_df], ignore_index=True)
else:
    print("⚠️ 未找到原始 all_data_meta.csv，生成新的文件。")
    combined_df = pd.DataFrame(negative_data, columns=["bird_name", "vocalization", "number", "segment_index", "path"])

# 生成新的 CSV 文件
new_meta_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta_with_negative.csv"
combined_df.to_csv(new_meta_csv_path, index=False, encoding="utf-8")

print("\n🎯 负样本处理完成！")
print(f"✅ 生成了 {negative_samples_processed} 个负样本（最大 {config.MAX_NEGATIVE_SAMPLES}）")
print(f"📄 频谱数据存储在 {config.OUTPUT_DIR}")
print(f"📊 新的元数据 CSV 文件已保存：{new_meta_csv_path}")


# 新版生成负样本spec和mel，并添加到all data meta 中

In [3]:
import os
import numpy as np
import pandas as pd
import librosa
import cv2
import math
import cupy as cp
from cupyx.scipy import signal as cupy_signal
from tqdm import tqdm

# 配置参数（和主数据一致）
class config:
    SEED = 2024
    DEVICE = 'cpu'
    OUTPUT_DIR_SPEC = "E:/AMR/DA/Projekt/data/Audio_spec_paperstyle"
    OUTPUT_DIR_MEL = "E:/AMR/DA/Projekt/data/Audio_spec_mel"
    FS = 48000
    N_FFT = 512
    WIN_SIZE = 512
    WIN_LAP = 384
    MIN_FREQ = 150
    MAX_FREQ = 15000
    SEGMENT_DURATION = 3
    SPEC_SIZE = (512, 256)
    MAX_NEGATIVE_SAMPLES = 3500

# 确保输出目录存在
os.makedirs(config.OUTPUT_DIR_SPEC, exist_ok=True)
os.makedirs(config.OUTPUT_DIR_MEL, exist_ok=True)

# ➤ 线性频谱图生成
def oog2spec_via_cupy(audio_data):
    if len(audio_data) == 0:
        return None
    audio_data = cp.array(audio_data)
    frequencies, times, spec_data = cupy_signal.spectrogram(
        audio_data,
        fs=config.FS,
        nfft=config.N_FFT,
        nperseg=config.WIN_SIZE,
        noverlap=config.WIN_LAP,
        window='hann'
    )
    valid_freq = (frequencies >= config.MIN_FREQ) & (frequencies <= config.MAX_FREQ)
    spec_data = spec_data[valid_freq, :]
    spec_data = cp.log10(spec_data + 1e-20)
    spec_data = (spec_data - spec_data.min()) / (spec_data.max() - spec_data.min() + 1e-9)
    return spec_data.get()

# ➤ mel 频谱图生成
def audio2mel(audio_data):
    mel_spec = librosa.feature.melspectrogram(
        y=audio_data,
        sr=config.FS,
        n_fft=config.N_FFT,
        hop_length=config.N_FFT - config.WIN_LAP,
        win_length=config.WIN_SIZE,
        window='hann',
        n_mels=64,
        fmin=config.MIN_FREQ,
        fmax=config.MAX_FREQ
    )
    mel_spec = np.log10(mel_spec + 1e-9)
    mel_spec = (mel_spec - mel_spec.min()) / (mel_spec.max() - mel_spec.min() + 1e-9)
    return mel_spec

# ➤ 处理负样本
negative_samples_dir = "E:/AMR/DA/Projekt/data/negative_audio"
negative_samples_processed = 0
negative_data = []

for root, _, files in os.walk(negative_samples_dir):
    for file in tqdm(files, desc="Processing Negative Audio Files"):
        if file.endswith(".wav"):
            file_path = os.path.join(root, file)

            try:
                audio_data, _ = librosa.load(file_path, sr=config.FS)
                if len(audio_data) == 0:
                    continue
            except Exception as e:
                print(f"❌ 加载失败: {file}, 错误: {e}")
                continue

            total_duration = len(audio_data) / config.FS
            num_segments = math.floor(total_duration / config.SEGMENT_DURATION)

            for seg_idx in range(num_segments + 1):  # 包含最后片段
                start_idx = seg_idx * config.SEGMENT_DURATION * config.FS
                end_idx = start_idx + config.SEGMENT_DURATION * config.FS
                segment_audio = audio_data[start_idx:end_idx]

                # 填充不足 3 秒的片段
                if len(segment_audio) < config.SEGMENT_DURATION * config.FS:
                    padding = config.SEGMENT_DURATION * config.FS - len(segment_audio)
                    segment_audio = np.pad(segment_audio, (0, padding), mode='constant')

                # ➤ 生成线性谱图
                spec_data = oog2spec_via_cupy(segment_audio)
                if spec_data is None:
                    continue
                spec_data = cv2.resize(spec_data, config.SPEC_SIZE, interpolation=cv2.INTER_AREA)
                spec_filename = f"negative_{file.split('.')[0]}_seg{seg_idx}.npy"
                spec_path = os.path.join(config.OUTPUT_DIR_SPEC, spec_filename)
                np.save(spec_path, spec_data.astype(np.float32))

                # ➤ 生成 mel 频谱图
                mel_data = audio2mel(segment_audio)
                mel_data = cv2.resize(mel_data, config.SPEC_SIZE, interpolation=cv2.INTER_AREA)
                mel_filename = f"negative_{file.split('.')[0]}_seg{seg_idx}_mel.npy"
                mel_path = os.path.join(config.OUTPUT_DIR_MEL, mel_filename)
                np.save(mel_path, mel_data.astype(np.float32))

                # ➤ 记录路径信息
                negative_data.append(["Background Noise", "none", f"negative_{file.split('.')[0]}", seg_idx, spec_path, mel_path])
                negative_samples_processed += 1

                if negative_samples_processed >= config.MAX_NEGATIVE_SAMPLES:
                    print(f"🎯 达到最大负样本数 {config.MAX_NEGATIVE_SAMPLES}，停止处理。")
                    break

            if negative_samples_processed >= config.MAX_NEGATIVE_SAMPLES:
                break
    if negative_samples_processed >= config.MAX_NEGATIVE_SAMPLES:
        break

# ➤ 合并 CSV
meta_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta.csv"
new_meta_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta_with_negative.csv"

if os.path.exists(meta_csv_path):
    existing_df = pd.read_csv(meta_csv_path)
    negative_df = pd.DataFrame(negative_data, columns=["bird_name", "vocalization", "number", "segment_index", "spec_path", "mel_path"])
    combined_df = pd.concat([existing_df, negative_df], ignore_index=True)
else:
    print("⚠️ 未找到原始 CSV，仅使用负样本生成新文件。")
    combined_df = pd.DataFrame(negative_data, columns=["bird_name", "vocalization", "number", "segment_index", "spec_path", "mel_path"])

# ➤ 保存新 CSV
combined_df.to_csv(new_meta_csv_path, index=False, encoding="utf-8")

print(f"\n✅ 负样本处理完成，生成样本数：{negative_samples_processed}")
print(f"📁 线性谱存储路径: {config.OUTPUT_DIR_SPEC}")
print(f"📁 mel谱存储路径: {config.OUTPUT_DIR_MEL}")
print(f"📝 新 CSV 已保存: {new_meta_csv_path}")


Processing Negative Audio Files:  11%|█▏        | 874/7690 [01:04<08:20, 13.61it/s]  


🎯 达到最大负样本数 3500，停止处理。

✅ 负样本处理完成，生成样本数：3500
📁 线性谱存储路径: E:/AMR/DA/Projekt/data/Audio_spec_paperstyle
📁 mel谱存储路径: E:/AMR/DA/Projekt/data/Audio_spec_mel
📝 新 CSV 已保存: E:/AMR/DA/Projekt/data/all_data_meta_with_negative.csv


# Spectogram.npy Filter

In [4]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm

# 📌 文件路径
data_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta_with_negative.csv"  # 原始数据
clean_data_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta_clean.csv"  # 过滤后数据
blacklist_path = "E:/AMR/DA/Projekt/data/blacklist.txt"  # 黑名单文件

# 读取黑名单（如果存在）
def load_blacklist():
    if os.path.exists(blacklist_path):
        with open(blacklist_path, "r") as f:
            return set(line.strip() for line in f.readlines())
    return set()

# 保存黑名单
def save_blacklist(blacklist):
    with open(blacklist_path, "w") as f:
        for file in blacklist:
            f.write(file + "\n")
    print(f"✅ 黑名单已更新，共 {len(blacklist)} 个无效文件")

# 检查 `.npy` 文件是否包含 NaN/Inf
def find_invalid_npy_files(df, blacklist):
    invalid_files = set(blacklist)  # 先加载已有的黑名单

    print("🔍 开始检查 .npy 文件...")
    for file_path in tqdm(df["path"], desc="扫描无效数据"):
        if file_path in invalid_files:
            continue  # 跳过已知无效文件

        if not os.path.exists(file_path):
            print(f"❌ 文件不存在: {file_path}")
            invalid_files.add(file_path)
            continue

        try:
            spectrogram = np.load(file_path)
            if np.isnan(spectrogram).any() or np.isinf(spectrogram).any():
                print(f"❌ 发现 NaN/Inf: {file_path}")
                invalid_files.add(file_path)
        except Exception as e:
            print(f"⚠️ 读取失败: {file_path}, 错误: {e}")
            invalid_files.add(file_path)

    return invalid_files

# 运行数据过滤
df = pd.read_csv(data_csv_path)
blacklist = load_blacklist()
invalid_files = find_invalid_npy_files(df, blacklist)
save_blacklist(invalid_files)

# 过滤无效数据
df_clean = df[~df["path"].isin(invalid_files)]
df_clean.to_csv(clean_data_csv_path, index=False, encoding="utf-8")

print(f"✅ 已过滤无效数据，生成 {clean_data_csv_path}（保留 {len(df_clean)} 条数据）")


🔍 开始检查 .npy 文件...


KeyError: 'path'

# 修改标签

In [None]:
import pandas as pd
import numpy as np
import cv2
from scipy import ndimage
from tqdm import tqdm  # 进度条，方便观察进度

# 定义 hasBird() 方法
def hasBird(spec, threshold=16):
    img = spec.copy()
    
    # STEP 1: Median blur
    img = cv2.medianBlur(img, 5)
    
    # STEP 2: Median threshold
    col_median = np.median(img, axis=0, keepdims=True)
    row_median = np.median(img, axis=1, keepdims=True)
    img[img < row_median * 1.2] = 0
    img[img < col_median * 1.2] = 0
    img[img > 0] = 1
    
    # STEP 3: Remove isolated pixels
    struct = np.ones((3, 3))
    id_regions, num_ids = ndimage.label(img, structure=struct)
    id_sizes = np.array(ndimage.sum(img, id_regions, range(num_ids + 1)))
    area_mask = (id_sizes == 1)
    img[area_mask[id_regions]] = 0
    
    # STEP 4: Morphological closing
    img = cv2.morphologyEx(img, cv2.MORPH_CLOSE, np.ones((5, 5), np.float32))
    
    # STEP 5: Frequency crop (keeping middle frequency range)
    img = img[8:-85, :]
    
    # STEP 6: Count active rows
    row_max = np.max(img, axis=1)
    row_max = ndimage.binary_dilation(row_max, iterations=2).astype(row_max.dtype)
    rthresh = row_max.sum()
    
    # STEP 7: Apply threshold
    return rthresh >= threshold

# 读取 CSV 文件
csv_path = "E:/AMR/DA/Projekt/data/valid_list_for_zoom006_100_nofreefiled.csv"
output_csv_path = "E:/AMR/DA/Projekt/data/vliad_list_for_zoom006_100_nofreefiled_refine_th12.csv"
counter = 0

df = pd.read_csv(csv_path)

# 遍历 CSV，处理每个频谱图
for idx, row in tqdm(df.iterrows(), total=len(df)):
    # 如果 bird_name 已经是 "Background Noise"，跳过检测
    if row["bird_name"] == "Background Noise":
        continue  

    spec_path = row["path"].replace("\\", "/")  # 兼容 Windows 路径
    try:
        # 读取 .npy 频谱数据
        spec_data = np.load(spec_path)
        
        # 判断是否有鸟声
        if not hasBird(spec_data):
            df.at[idx, "bird_name"] = "Background Noise"
            df.at[idx, "vocalization"] = "none"
            counter += 1
    except Exception as e:
        print(f"⚠️ 无法处理文件: {spec_path}, 错误: {e}")

# 保存新的 CSV 文件
df.to_csv(output_csv_path, index=False, encoding="utf-8")

print(f"✅ 处理完成，已保存至 {output_csv_path}")
print(f"需要修改的数量：{counter}")


In [None]:
import numpy as np
import cv2
from scipy import ndimage
import matplotlib.pyplot as plt

# 可视化中间步骤
def visualize_step(title, img):
    plt.figure(figsize=(8, 4))
    plt.imshow(img, aspect='auto', cmap='magma', origin='lower')
    plt.colorbar()
    plt.title(title)
    plt.show()

# 选择一个 `.npy` 样本路径（你可以手动改成你想测试的文件）
spec_path = "E:/AMR/DA/Projekt/data/Audio_spec\XC864575_seg28.npy"

spec_data = np.load(spec_path)

def hasBird_debug(spec, threshold=16):
    img = spec.copy()

    # STEP 1: 原始频谱图
    visualize_step("Step 1: raw spec", img)
    
    # STEP 2: Median blur
    img = cv2.medianBlur(img, 5)
    visualize_step("Step 2: after Median Blur", img)

    # STEP 3: Median threshold
    col_median = np.median(img, axis=0, keepdims=True)
    row_median = np.median(img, axis=1, keepdims=True)
    img[img < row_median * 1.2] = 0
    img[img < col_median * 1.2] = 0   # baseline = 1.2
    img[img > 0] = 1
    visualize_step("Step 3: after Median threshold", img)

    # STEP 4: Remove isolated pixels
    struct = np.ones((3, 3))
    id_regions, num_ids = ndimage.label(img, structure=struct)
    id_sizes = np.array(ndimage.sum(img, id_regions, range(num_ids + 1)))
    area_mask = (id_sizes == 1)
    img[area_mask[id_regions]] = 0
    visualize_step("Step 4: after Remove isolated pixels", img)

    # STEP 5: Morphological closing
    img = cv2.morphologyEx(img, cv2.MORPH_CLOSE, np.ones((5, 5), np.float32))
    visualize_step("Step 5: after Morphological closing", img)

    # STEP 6: Frequency crop (keeping middle frequency range)
    img = img[8:-85, :]
    visualize_step("Step 6: after Frequency crop ", img)

    # STEP 7: Count active rows
    row_max = np.max(img, axis=1)
    row_max = ndimage.binary_dilation(row_max, iterations=2).astype(row_max.dtype)
    rthresh = row_max.sum()

    print(f"Step 7: Count active rows: {rthresh}")

    # STEP 8: Apply threshold
    result = rthresh >= threshold
    print(f"Step 8: has bird? {result}")
    return result

# 运行调试版 hasBird 方法
hasBird_debug(spec_data, threshold=16)


# ------------------------------------------------------------
# 在此之前的代码一般只用执行一次

# Bird Target filter

In [None]:
import pandas as pd

# 📌 文件路径
input_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta_clean.csv"  # 经过 NaN/Inf 过滤和标签修正的文件
# input_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta.csv"  # 经过 NaN/Inf 过滤和标签修正的文件
output_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta_allTypes.csv"  # 目标鸟类数据

# 🎯 目标鸟类（仅保留这些类别）
# selected_birds = [
#     "Eurasian Blue tit",
#     "Eurasian Bullfinch",
#     "Great Tit",
#     "Hawfinch",
#     "Hooded Crow",
#     "Stock Dove",
#     "Background Noise",
# ]

selected_birds = [
    "Black-headed Gull",
    "Canada Goose",
    "Carrion Crow",
    "Common Blackbird",
    "Common Chaffinch",
    "Common Kingfisher",
    "Common Redstart",
    "Common Wood Pigeon",
    "Dunnock",
    "Eurasian Blackcap",
    "Eurasian Blue tit",
    "Eurasian Bullfinch",
    "Eurasian Coot",
    "Eurasian Golden Oriole",
    "Eurasian Jay",
    "Eurasian Nuthatch",
    "Eurasian Siskin",
    "Eurasian Treecreeper",
    "Eurasian Wren",
    "European Goldfinch",
    "European Robin",
    "Goldcrest",
    "Great Spotted Woodpecker",
    "Great Tit",
    "Hawfinch",
    "Hooded Crow",
    "Long-tailed Tit",
    "Mallard",
    "Marsh Tit",
    "Redwing",
    "Rook",
    "Short-toed Treecreeper",
    "Stock Dove",
    "Background Noise",
    "Background Noise",
]

# 🎯 vocalization 过滤条件（根据你的实际需求修改）
selected_vocalization = ["Call", "Song", "Alarm call", "Flight call", "Begging call", "none"]  # 假设你只想保留 vocalization 为 'call' 或 'song' 的样本

# 读取数据
df = pd.read_csv(input_csv_path)
print(f"📊 过滤前数据: {len(df)} 条")

# 只保留目标鸟类
df_filtered = df[df["bird_name"].isin(selected_birds)]

# 只保留符合 vocalization 条件的数据
df_filtered = df_filtered[df_filtered["vocalization"].isin(selected_vocalization)]

# 保存筛选后的数据
df_filtered.to_csv(output_csv_path, index=False, encoding="utf-8")

print(f"✅ 目标鸟类和 vocalization 筛选完成，生成 {output_csv_path}（保留 {len(df_filtered)} 条数据）")


# split train and valid

In [None]:
import os
import pandas as pd

# 📌 你的 CSV 文件路径
data_csv_path = "E:/AMR/DA/Projekt/data/train_list_for_zoom006_0324.csv"

# ✅ **检查文件是否存在**
if not os.path.exists(data_csv_path):
    print(f"❌ 错误: 找不到文件 {data_csv_path}")
    exit(1)

# 读取 CSV 数据
df = pd.read_csv(data_csv_path)

# 统计每个类别（bird_name）的样本数量
class_counts = df["bird_name"].value_counts().reset_index()
class_counts.columns = ["类别", "样本数量"]

# 📌 **打印结果**
print("📊 各类别样本数量统计：")
print(class_counts)



In [None]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split

# 📌 文件路径
data_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta_allTypes.csv"
train_csv_path = "E:/AMR/DA/Projekt/data/train_list_for_zoom006_0324.csv"
test_csv_path = "E:/AMR/DA/Projekt/data/valid_list_for_zoom006_0324.csv"

# 读取数据
df = pd.read_csv(data_csv_path)
print(f"📊 原始数据: {len(df)} 条")

# ✅ **定义音频编号解析函数**
def get_audio_id(number, bird_name):
    if bird_name == "Background Noise":
        return number
    return number.split("_")[0] if "_" in number else number

# 计算 `audio_id`
df["audio_id"] = df.apply(lambda row: get_audio_id(row["number"], row["bird_name"]), axis=1)

# 🚀 拆分为普通样本和负样本
neg_samples = df[df["bird_name"] == "Background Noise"].copy()
pos_samples = df[df["bird_name"] != "Background Noise"].copy()

# ✅ 按 audio_id 进行训练/测试划分（20% 测试集）
unique_audio_ids = pos_samples["audio_id"].unique()
train_audio_ids, test_audio_ids = train_test_split(unique_audio_ids, test_size=0.2, random_state=2024, shuffle=True)

train_df = pos_samples[pos_samples["audio_id"].isin(train_audio_ids)]
test_df = pos_samples[pos_samples["audio_id"].isin(test_audio_ids)]

# ✅ 限制测试集每类不超过 500 个样本，剩下的回流到训练集（排除碎片泄露）
test_limited_df = []
train_remainder_df = []
test_audio_ids_set = set(test_df["audio_id"])

for bird_name in test_df["bird_name"].unique():
    class_samples = test_df[test_df["bird_name"] == bird_name]

    if len(class_samples) > 500:
        test_limited = class_samples.sample(n=500, random_state=2024)
        remainder = class_samples.drop(test_limited.index)

        # ❗ 移除与 test_limited 相同 audio_id 的碎片，避免回流污染
        remainder = remainder[~remainder["audio_id"].isin(test_limited["audio_id"])]
        train_remainder = remainder
    else:
        test_limited = class_samples
        train_remainder = pd.DataFrame()

    test_limited_df.append(test_limited)
    train_remainder_df.append(train_remainder)

# 合并测试集 & 安全回流训练集
test_df = pd.concat(test_limited_df, ignore_index=True)
train_df = pd.concat([train_df] + train_remainder_df, ignore_index=True)

# ✅ 背景噪声划分
neg_audio_ids = neg_samples["audio_id"].unique()
print("\n🔍 负样本 `audio_id` 统计：")
print(pd.Series(neg_audio_ids).value_counts())

if len(neg_audio_ids) >= 2:
    train_neg_ids, test_neg_ids = train_test_split(neg_audio_ids, test_size=0.2, random_state=2024, shuffle=True)
    train_neg_df = neg_samples[neg_samples["audio_id"].isin(train_neg_ids)]
    test_neg_df = neg_samples[neg_samples["audio_id"].isin(test_neg_ids)]
else:
    print(f"⚠️ 负样本数量不足 ({len(neg_audio_ids)} 个)，全部放入训练集！")
    train_neg_df = neg_samples
    test_neg_df = pd.DataFrame(columns=df.columns)

# ✅ 合并训练 & 测试集
train_df = pd.concat([train_df, train_neg_df], ignore_index=True)
test_df = pd.concat([test_df, test_neg_df], ignore_index=True)

# ✅ 删除辅助列 audio_id
train_df.drop(columns=["audio_id"], inplace=True)
test_df.drop(columns=["audio_id"], inplace=True)

# ✅ 保存 CSV 文件
os.makedirs(os.path.dirname(train_csv_path), exist_ok=True)
train_df.to_csv(train_csv_path, index=False, encoding="utf-8")
test_df.to_csv(test_csv_path, index=False, encoding="utf-8")

print(f"✅ 训练集已保存至: {train_csv_path}, 样本数: {len(train_df)}")
print(f"✅ 测试集已保存至: {test_csv_path}, 样本数: {len(test_df)}")


# cross validation split

In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# 📌 配置
data_csv_path = "E:/AMR/DA/Projekt/data/all_data_meta_filtered.csv"  # 目标鸟类数据
output_dir = "E:/AMR/DA/Projekt/data"  # 交叉验证数据保存目录

# 读取数据
df = pd.read_csv(data_csv_path)
print(f"📊 原始数据: {len(df)} 条")

# 提取音频编号（去掉 segX）
df["audio_id"] = df["number"].apply(lambda x: x.split("_")[0] if "_" in x else x)

# 创建输出文件夹
os.makedirs(output_dir, exist_ok=True)

# 🚀 **按音频编号进行划分（确保所有片段在同一个集合）**
unique_audio_ids = df["audio_id"].unique()
part1_audio_ids, part2_audio_ids = train_test_split(unique_audio_ids, test_size=0.5, random_state=2024, shuffle=True)

# **分配数据**
part1_df = df[df["audio_id"].isin(part1_audio_ids)].drop(columns=["audio_id"])
part2_df = df[df["audio_id"].isin(part2_audio_ids)].drop(columns=["audio_id"])

# **保存文件**
part1_csv_path = os.path.join(output_dir, "crossval_part1.csv")
part2_csv_path = os.path.join(output_dir, "crossval_part2.csv")

part1_df.to_csv(part1_csv_path, index=False, encoding="utf-8")
part2_df.to_csv(part2_csv_path, index=False, encoding="utf-8")

print(f"✅ 交叉验证数据集 1: {part1_csv_path}, 样本数: {len(part1_df)}")
print(f"✅ 交叉验证数据集 2: {part2_csv_path}, 样本数: {len(part2_df)}")
print("\n🎯 交叉验证数据集划分完成！")


# train and valid distribution

In [None]:
import pandas as pd
train_df = pd.read_csv("E:/AMR/DA/Projekt/data/train_list_for_zoom006_0315.csv")
valid_df = pd.read_csv("E:/AMR/DA/Projekt/data/valid_list_for_zoom006_0315.csv")

print("训练集类别分布:")
print(train_df["bird_name"].value_counts())

print("\n测试集类别分布:")
print(valid_df["bird_name"].value_counts())

train_df_refine = pd.read_csv("E:/AMR/DA/Projekt/data/train_list_for_zoom006_100_nofreefiled_refine_th12.csv")
valid_df_refine = pd.read_csv("E:/AMR/DA/Projekt/data/valid_list_for_zoom006_100_nofreefiled_refine_th12.csv")

print("训练集类别分布refine:")
print(train_df_refine["bird_name"].value_counts())
print("\n测试集类别分布refine:")
print(valid_df_refine["bird_name"].value_counts())


In [None]:
import os
import collections

# 📌 你的音频数据存放的根目录（修改为你的路径）
audio_root = "E:/AMR/DA/Projekt/data/Audio_files"

# 📌 统计音频编号出现的文件夹
xc_file_map = collections.defaultdict(set)

# 🚀 遍历所有 vocalization 文件夹
for vocalization in os.listdir(audio_root):
    vocalization_path = os.path.join(audio_root, vocalization)
    
    if os.path.isdir(vocalization_path):  # 确保是文件夹
        for file in os.listdir(vocalization_path):
            if file.endswith(".wav") and file.startswith("XC"):
                xc_id = file.split(".")[0]  # 获取音频编号，例如 XC123456
                xc_file_map[xc_id].add(vocalization)  # 记录该音频在哪些 vocalization 中出现

# 🚀 找到重复出现的音频
duplicates = {xc: v for xc, v in xc_file_map.items() if len(v) > 1}

# 📌 按照出现的 vocalization 数量排序
sorted_duplicates = sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)

# ✅ 打印结果
print(f"🔍 共有 {len(sorted_duplicates)} 个音频文件出现在多个 vocalization 目录中:\n")
for xc_id, voc_types in sorted_duplicates:
    print(f"{xc_id}: 出现在 {len(voc_types)} 个 vocalization 中 → {', '.join(voc_types)}")

# 📌 保存结果到 CSV
import pandas as pd

df = pd.DataFrame([(xc_id, len(voc_types), ", ".join(voc_types)) for xc_id, voc_types in sorted_duplicates],
                  columns=["XC_ID", "Vocalization Count", "Vocalization Types"])
df.to_csv("E:/AMR/DA/Projekt/data/duplicate_audio_files.csv", index=False, encoding="utf-8-sig")

print(f"\n✅ 统计完成，结果已保存至 `duplicate_audio_files.csv`")


In [None]:
import pandas as pd

# 📌 你的 CSV 文件路径
csv_path = "E:/AMR/DA/Projekt/data/train_meta_100.csv"
output_csv_path = "E:/AMR/DA/Projekt/data/train_meta_100_deduplicated.csv"

# ✅ 读取 CSV
df = pd.read_csv(csv_path)

# ✅ 提取 `XC` 编号
df["XC_ID"] = df["number"].apply(lambda x: x.split("_")[0] if "_" in x else x)

# ✅ **去重（保留第一次出现的 XC_ID）**
df_deduplicated = df.drop_duplicates(subset="XC_ID", keep="first")

# ✅ **删除 `XC_ID` 辅助列**
df_deduplicated = df_deduplicated.drop(columns=["XC_ID"])

# ✅ 保存去重后的 CSV
df_deduplicated.to_csv(output_csv_path, index=False, encoding="utf-8-sig")

print(f"✅ 去重完成！原始数据: {len(df)} 条 → 处理后: {len(df_deduplicated)} 条")
print(f"📄 结果已保存至: {output_csv_path}")
