<a href="https://colab.research.google.com/github/kohgeonbu/kohgeonbu/blob/main/annomal_mix.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1) 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

# 2) 패키지 설치
!pip -q install numpy librosa soundfile tqdm

# 3) 믹서 코드(간단 버전)
import os, csv, random, numpy as np, librosa, soundfile as sf
from tqdm import tqdm

def load_mono(path, target_sr=None):
    x, sr = librosa.load(path, sr=target_sr, mono=True)
    return x.astype(np.float32), (target_sr or sr)

def rms(x): return float(np.sqrt(np.mean(np.square(x))+1e-12))

def apply_fade(x, sr, fade_ms=15.0):
    n=len(x); k=int(sr*fade_ms/1000.0)
    if k<=0 or k*2>=n: return x
    w_in=np.linspace(0,1,k,dtype=np.float32); w_out=w_in[::-1]
    y=x.copy(); y[:k]*=w_in; y[-k:]*=w_out; return y

def normalize_peak(x, target_dbfs=-1.0):
    peak=float(np.max(np.abs(x))+1e-12); limit=10**(target_dbfs/20.0)
    return x*(limit/peak) if peak>limit else x

def scale_to_snr(bg_seg, fg_seg, snr_db):
    p_bg=rms(bg_seg)**2; p_fg=rms(fg_seg)**2
    if p_bg<1e-12 or p_fg<1e-12: return np.zeros_like(fg_seg)
    target_p_fg=p_bg/(10**(snr_db/10.0))
    scale=np.sqrt(max(target_p_fg,1e-12)/(p_fg+1e-12))
    return fg_seg*scale

def collect_audio_files(root, exts=(".wav",".flac",".mp3")):
    outs=[]
    for dp,_,fns in os.walk(root):
        for fn in fns:
            if fn.lower().endswith(exts): outs.append(os.path.join(dp,fn))
    outs.sort(); return outs

def mix_dataset(bg_dir, anom_dir, out_dir, target_sr=16000,
                snr_list=(0,3,6,10), mix_per_bg=2, onset_min=0.2, onset_max=6.0,
                split_ratio=0.8, do_split=True, seed=42):
    os.makedirs(out_dir, exist_ok=True)
    meta_path=os.path.join(out_dir,"metadata.csv")
    rng=random.Random(seed); np.random.seed(seed)

    bg_files=collect_audio_files(bg_dir); anom_files=collect_audio_files(anom_dir)
    assert bg_files, f"No background files in {bg_dir}"
    assert anom_files, f"No anomaly files in {anom_dir}"

    def split(lst, ratio):
        idx=list(range(len(lst))); rng.shuffle(idx)
        cut=int(len(lst)*ratio); return [lst[i] for i in idx[:cut]], [lst[i] for i in idx[cut:]]

    subsets=[("all", bg_files)]
    if do_split:
        tr, te = split(bg_files, split_ratio)
        subsets=[("train", tr), ("test", te)]

    with open(meta_path, "w", newline="", encoding="utf-8") as f:
        wr=csv.writer(f)
        wr.writerow(["subset","mixed_path","bg_file","anom_file","sr","snr_db","onset_sec","duration_sec","label"])

        for subset, bgs in subsets:
            subdir=os.path.join(out_dir, subset); os.makedirs(subdir, exist_ok=True)
            for bgp in tqdm(bgs, desc=f"{subset} bg"):
                try:
                    bg, sr = load_mono(bgp, target_sr=target_sr)
                except Exception as e:
                    print("[WARN] skip bg:", bgp, e); continue

                for _ in range(mix_per_bg):
                    ap = rng.choice(anom_files)
                    try:
                        fg, _ = load_mono(ap, target_sr=sr)
                    except Exception as e:
                        print("[WARN] skip fg:", ap, e); continue

                    max_onset = max(onset_min, min(onset_max, (len(bg)/sr)-0.2))
                    onset_sec = rng.uniform(onset_min, max_onset)
                    onset = int(onset_sec*sr)
                    avail = max(1, len(bg)-onset)
                    fg_seg = apply_fade(fg[:avail], sr, fade_ms=15.0)
                    bg_seg = bg[onset:onset+len(fg_seg)]
                    snr_db = rng.choice(snr_list)
                    fg_scaled = scale_to_snr(bg_seg, fg_seg, snr_db)
                    y = bg.copy(); y[onset:onset+len(fg_scaled)] = bg_seg + fg_scaled
                    y = normalize_peak(y, -1.0)

                    bg_base=os.path.splitext(os.path.basename(bgp))[0]
                    fg_base=os.path.splitext(os.path.basename(ap))[0]
                    out_name=f"{bg_base}__{fg_base}__snr{snr_db}dB__on{onset_sec:.2f}.wav"
                    out_path=os.path.join(subdir, out_name)
                    sf.write(out_path, y, sr, subtype="PCM_16")

                    wr.writerow([subset,out_path,bgp,ap,sr,snr_db,round(onset_sec,3),round(len(fg_scaled)/sr,3),1])

# 4) 당신의 드라이브 경로(요청대로 고정 세팅)
bg_dir   = "/content/drive/MyDrive/normal"
anom_dir = "/content/drive/MyDrive/abnormal"
out_dir  = "/content/drive/MyDrive/mixed_out"

# 5) 실행(필요하면 파라미터만 바꿔주세요)
mix_dataset(
    bg_dir, anom_dir, out_dir,
    target_sr=16000,          # 원본 SR 유지 원하면 48000 등 원하는 값으로 바꿔도 OK(현재는 16k 권장)
    snr_list=(0,3,6,10),
    mix_per_bg=2,             # 정상 1개당 2개 생성
    onset_min=0.2, onset_max=6.0,
    split_ratio=0.8, do_split=True, seed=42
)

print("Done! Check:", out_dir)


Mounted at /content/drive


train bg: 100%|██████████| 100/100 [01:00<00:00,  1.66it/s]
test bg: 100%|██████████| 25/25 [00:04<00:00,  5.13it/s]

Done! Check: /content/drive/MyDrive/mixed_out



