<a href="https://colab.research.google.com/github/thoongthuck/R-E/blob/main/%EB%8D%B0%EC%9D%B4%ED%84%B0_%EC%A0%84%EC%B2%98%EB%A6%AC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 피처 추출

Kpop 데이터셋 피처 추출

In [None]:
!git clone https://github.com/EX3exp/Kpop-lyric-datasets.git

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
import json
import os

# data load
song_df = []
for year in range(2000,2024):
  for month in range(1,13):
    for rank in range(1,51):
      file_path = f'/content/Kpop-lyric-datasets/melon/monthly-chart/melon-{year}/melon-{year}-{month:02d}/melon-monthly_{year}-{month:02d}_{rank:02d}.json'

      if not os.path.exists(file_path):
        continue

      try:
        with open(file_path, 'r', encoding='utf-8') as f:
          song_json = json.load(f)
        read_artist = song_json['artist']
        read_title = song_json['song_name']
        read_lyric = song_json['lyrics']['lines']

        song_df.append({
          "artist": read_artist,
          "title": read_title,
          "lyric": read_lyric
        })

      except FileNotFoundError:
        continue

song_df = pd.DataFrame(song_df)
song_df.to_csv('lyric_df.csv')

In [None]:
import os, re, sys, time, argparse
from typing import Dict, Iterable, List, Optional, Tuple
import requests
import numpy as np
import librosa
import pandas as pd

API = "https://api.deezer.com"

In [None]:
def sanitize(name):
    return re.sub(r'[\\/:*?"<>|\x00-\x1F]+', '', name).strip()[:200]

def ensure_dir(path):
    if path and not os.path.exists(path):
        os.makedirs(path, exist_ok=True)

def fetch_json(url, params=None, retry=3, timeout = 15):
    for i in range(retry):
        try:
            r = requests.get(url, params=params, timeout=timeout)
            if r.status_code == 200:
                try:
                    return r.json()
                except ValueError:
                    print(f"[debug] non-JSON: {r.text[:200]}", file=sys.stderr)
            else:
                print(f"[warn] HTTP {r.status_code} {r.url}", file=sys.stderr)
        except requests.RequestException as e:
            print(f"[warn] {e}", file=sys.stderr)
        time.sleep(1.0 * (i + 1))
    return {}

In [None]:
def track_item_from_id(tid: int) -> Optional[dict]:
    j = fetch_json(f"{API}/track/{tid}")
    return j if j and j.get("id") else None

def track_items_from_album(aid: int) -> Iterable[dict]:
    j = fetch_json(f"{API}/album/{aid}")
    tracks = j.get("tracks", {})
    for t in tracks.get("data", []):
        yield t

def track_items_from_playlist(pid: int) -> Iterable[dict]:
    j = fetch_json(f"{API}/playlist/{pid}")
    tracks = j.get("tracks", {})
    if "data" in tracks:
        for t in tracks["data"]:
            yield t
        next_url = tracks.get("next")
        while next_url:
            page = fetch_json(next_url)
            for t in page.get("data", []):
                yield t
            next_url = page.get("next")

def track_items_from_artist_top(artist_id: int, limit: int = 50) -> Iterable[dict]:
    j = fetch_json(f"{API}/artist/{artist_id}/top", params={"limit": limit})
    for t in j.get("data", []):
        yield t

def search_tracks(query: str, limit: int = 5) -> Iterable[dict]:
    j = fetch_json(f"{API}/search", params={"q": query, "limit": limit})
    for t in j.get("data", []):
        yield t

In [None]:
def choose_filename(t: dict, outdir: str) -> str:
    artist = t.get("artist", {}).get("name", "Unknown Artist")
    title  = t.get("title", f"track-{t.get('id','unknown')}")
    return os.path.join(outdir, sanitize(f"{artist} - {title} (preview).mp3"))

def download_preview(t: dict, outdir: str, overwrite: bool = False) -> Optional[str]:
    url = t.get("preview")
    tid = t.get("id")
    if not url:
        print(f"[skip] preview 없음 (track id: {tid})", file=sys.stderr)
        return None
    ensure_dir(outdir)
    dest = choose_filename(t, outdir)
    if os.path.exists(dest) and not overwrite:
        print(f"[keep] 이미 있음: {os.path.basename(dest)}")
        return dest
    try:
        with requests.get(url, stream=True, timeout=30) as r:
            r.raise_for_status()
            with open(dest, "wb") as f:
                for ch in r.iter_content(8192):
                    if ch: f.write(ch)
        print(f"[ok] saved: {os.path.basename(dest)}")
        return dest
    except requests.RequestException as e:
        print(f"[err] 다운로드 실패: {e}", file=sys.stderr)
        return None

In [None]:
import numpy as np
from typing import Tuple

# Mode: Krumhansl–Kessler
_PITCHES = np.array(["C","C#","D","D#","E","F","F#","G","G#","A","A#","B"])
_KK_MAJOR = np.array([6.35,2.23,3.48,2.33,4.38,4.09,2.52,5.19,2.39,3.66,2.29,2.88], dtype=float)
_KK_MINOR = np.array([6.33,2.68,3.52,5.38,2.60,3.53,2.54,4.75,3.98,2.69,3.34,3.17], dtype=float)

def _best_key_from_chroma_mean(chroma_mean: np.ndarray) -> Tuple[str, str, float]:
    def z(x):
        x = np.asarray(x, float)
        return (x - x.mean()) / (x.std() + 1e-8)

    cm = z(chroma_mean)
    best_corr, best_key, best_mode = -np.inf, None, None

    for shift in range(12):
        maj = np.roll(_KK_MAJOR, shift)
        minr = np.roll(_KK_MINOR, shift)
        cmaj = np.corrcoef(cm, z(maj))[0,1]
        cmin = np.corrcoef(cm, z(minr))[0,1]
        if cmaj > best_corr:
            best_corr = cmaj
            best_mode = "major"
        if cmin > best_corr:
            best_corr = cmin
            best_mode = "minor"

    return float(best_corr)

In [None]:
def extract_features(audio_path: str, sr_target: int = 22050):
    y, sr = librosa.load(audio_path, sr=sr_target, mono=True)

    y_h = librosa.effects.harmonic(y)
    y_p = librosa.effects.percussive(y)

    tempo, _ = librosa.beat.beat_track(y=y_p, sr=sr)

    rms = librosa.feature.rms(y=y).flatten()
    rms_db = librosa.amplitude_to_db(rms, ref=1.0)

    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    mfcc_mean = mfcc.mean(axis=1)
    mfcc_std  = mfcc.std(axis=1)

    chroma = librosa.feature.chroma_cqt(y=y_h, sr=sr)
    chroma_mean = chroma.mean(axis=1)

    corr = _best_key_from_chroma_mean(chroma_mean)

    return {
        "tempo_bpm": float(tempo[0]),
        "rms_mean": float(rms.mean()),
        "rms_std": float(rms.std()),
        "rms_db_mean": float(rms_db.mean()),
        "rms_db_std": float(rms_db.std()),
        "mfcc_mean": mfcc_mean,
        "mfcc_std":  mfcc_std,
        "chroma_mean": chroma_mean,
        "mode_conf": corr
    }

In [None]:
def features_to_row(meta: dict, feats: Dict[str, object], path: str) -> dict:
    row = {
        "tempo_bpm": feats["tempo_bpm"],
        "rms_mean": feats["rms_mean"],
        "rms_std": feats["rms_std"],
        "rms_db_mean": feats["rms_db_mean"],
        "rms_db_std": feats["rms_db_std"],
        "mode_conf": feats["mode_conf"],
    }
    # MFCC mean/std
    for i, v in enumerate(np.asarray(feats["mfcc_mean"]).tolist()):
        row[f"mfcc_mean_{i}"] = float(v)
    for i, v in enumerate(np.asarray(feats["mfcc_std"]).tolist()):
        row[f"mfcc_std_{i}"] = float(v)
    # Chroma mean
    for pitch, v in zip(_PITCHES, np.asarray(feats["chroma_mean"]).tolist()):
        row[f"chroma_{pitch}"] = float(v)
    return row

In [None]:
def collect_items(args) -> List[dict]:
    items: List[dict] = []
    if args.track_id:
        t = track_item_from_id(args.track_id)
        if t: items = [t]
    elif args.album_id:
        items = list(track_items_from_album(args.album_id))
    elif args.playlist_id:
        items = list(track_items_from_playlist(args.playlist_id))
    elif args.artist_id:
        items = list(track_items_from_artist_top(args.artist_id, limit=args.limit))
    elif args.search:
        items = list(search_tracks(args.search, limit=args.limit))
    else:
        print("[err] 소스 지정 x", file=sys.stderr)
        sys.exit(2)

    if args.title and args.artist and args.search:
        best = select_best_match(items, args.title, args.artist)
        if best:
            items = [best]
    return items

def pipeline(args) -> pd.DataFrame:
    ensure_dir(args.outdir)
    items = collect_items(args)
    if not items:
        print("[warn] 처리할 트랙이 없음", file=sys.stderr)
        return pd.DataFrame()

    rows = []
    for t in items[: args.limit if args.limit else None]:
        if not t.get("preview"):
            print(f"[skip] preview 없음 (title={t.get('title')})", file=sys.stderr)
            continue
        mp3 = download_preview(t, args.outdir, overwrite=args.overwrite)
        if not mp3:
            continue
        try:
            feats = extract_features(mp3, sr_target=args.sr)
            rows.append({
                **features_to_row(t, feats, mp3),
                'title': args.title,
                'artist': args.artist
                })
            print('추출 성공')
        except Exception as e:
            print(f"[err] failed: {e}", file=sys.stderr)

    df = pd.DataFrame(rows)

    return df

In [None]:
def build_query_from_title_artist(title: str, artist: str) -> str:
  t = title.replace('"', '\\"').strip()
  a = artist.replace('"', '\\"').strip()
  return f'artist:"{a}" track:"{t}"'


def _normalize_text(s: str) -> str:
    s = s or ""
    s = s.lower()
    s = re.sub(r"\(.*?\)|\[.*?\]", "", s)
    s = re.sub(r"\bfeat(?:uring)?\.?\b.*", "", s)
    s = re.sub(r"[^a-z0-9가-힣]+", "", s)
    return s.strip()

def select_best_match(items: List[dict], title: str, artist: str) -> Optional[dict]:
    if not items:
        return None
    nt = _normalize_text(title)
    na = _normalize_text(artist)
    best, best_score = None, -1.0
    for t in items:
        t_title = t.get("title", "")
        t_artist = (t.get("artist") or {}).get("name", "")
        score = 0.0
        tt = _normalize_text(t_title)
        ta = _normalize_text(t_artist)
        if tt == nt: score += 2.0
        elif nt and nt in tt: score += 1.0
        if ta == na: score += 2.0
        elif na and na in ta: score += 1.0
        if t.get("preview"): score += 0.5
        if score > best_score:
            best, best_score = t, score
    return best

In [None]:
def main(chart, args=None):
    ap = argparse.ArgumentParser(description="Deezer 미리듣기 → MFCC/Chroma/Mode/BPM/RMS + DataFrame/CSV")
    src = ap.add_mutually_exclusive_group(required=False)
    src.add_argument("--track-id", type=int)
    src.add_argument("--album-id", type=int)
    src.add_argument("--playlist-id", type=int)
    src.add_argument("--artist-id", type=int)
    src.add_argument("--search", type=str)

    ap.add_argument("--title", type=str)
    ap.add_argument("--artist", type=str)
    ap.add_argument("-o", "--outdir", default="previews")
    ap.add_argument("-n", "--limit", type=int, default=10)
    ap.add_argument("--overwrite", action="store_true")
    ap.add_argument("--sr", type=int, default=22050)
    ap.add_argument("--csv", type=str, default="deezer_features_total.csv")
    ap.add_argument("--csv-encoding", type=str, default="utf-8-sig")

    parsed, _unknown = ap.parse_known_args(args)

    csv_path = parsed.csv
    written = os.path.exists(csv_path)

    dfs = []

    for idx in range(len(chart)):
        title = chart['title'].iloc[idx]
        artist = chart['artist'].iloc[idx]
        print(f"\n [{idx+1}/{len(chart)}] {artist} - {title}")

        parsed.title = title
        parsed.artist = artist
        parsed.search = build_query_from_title_artist(title, artist)

        try:
            df = pipeline(parsed)
            if df is not None and not df.empty:
                dfs.append(df)
            else:
                print(f"[skip] {artist} - {title}: 분석 결과 없음")
        except Exception as e:
            print(f"[err] {artist} - {title} 실패: {e}")

        # auto save
        if (idx + 1) % 4 == 0 and dfs:
            merged = pd.concat(dfs, ignore_index=True)
            merged.to_csv(csv_path, mode='a', header=not written, index=False, encoding=parsed.csv_encoding)
            written = True
            dfs = []
            print(f"[autosave] {idx+1}곡까지 저장")

    # save
    if dfs:
        merged = pd.concat(dfs, ignore_index=True)
        merged.to_csv(csv_path, mode='a', header=not written, index=False, encoding=parsed.csv_encoding)
        print(f"\n전체 {len(chart)}곡 피처 저장")
        return merged
    else:
        print("\n[warn] 트랙 데이터 없음")
        return pd.DataFrame()


In [None]:
main(song_df)

Deam 피처 추출

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os, re, sys, time, argparse
from typing import Dict, Iterable, List, Optional, Tuple
import requests
import numpy as np
import librosa
import pandas as pd

In [None]:
def ensure_dir(path):
    if path and not os.path.exists(path):
        os.makedirs(path, exist_ok=True)

In [None]:
from typing import Tuple

# Mode: Krumhansl–Kessler
_PITCHES = np.array(["C","C#","D","D#","E","F","F#","G","G#","A","A#","B"])
_KK_MAJOR = np.array([6.35,2.23,3.48,2.33,4.38,4.09,2.52,5.19,2.39,3.66,2.29,2.88], dtype=float)
_KK_MINOR = np.array([6.33,2.68,3.52,5.38,2.60,3.53,2.54,4.75,3.98,2.69,3.34,3.17], dtype=float)

def _best_key_from_chroma_mean(chroma_mean: np.ndarray) -> Tuple[str, str, float]:
    def z(x):
        x = np.asarray(x, float)
        return (x - x.mean()) / (x.std() + 1e-8)

    cm = z(chroma_mean)
    best_corr, best_key, best_mode = -np.inf, None, None

    for shift in range(12):
        maj = np.roll(_KK_MAJOR, shift)
        minr = np.roll(_KK_MINOR, shift)
        cmaj = np.corrcoef(cm, z(maj))[0,1]
        cmin = np.corrcoef(cm, z(minr))[0,1]
        if cmaj > best_corr:
            best_corr = cmaj
            best_mode = "major"
        if cmin > best_corr:
            best_corr = cmin
            best_mode = "minor"

    return float(best_corr)


In [None]:
def extract_features(audio_path: str, sr_target: int = 22050):
    y, sr = librosa.load(audio_path, duration=30.0, sr=sr_target, mono=True)

    y_h = librosa.effects.harmonic(y)
    y_p = librosa.effects.percussive(y)

    tempo, _ = librosa.beat.beat_track(y=y_p, sr=sr)

    rms = librosa.feature.rms(y=y).flatten()
    rms_db = librosa.amplitude_to_db(rms, ref=1.0)

    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    mfcc_mean = mfcc.mean(axis=1)
    mfcc_std  = mfcc.std(axis=1)

    chroma = librosa.feature.chroma_cqt(y=y_h, sr=sr)
    chroma_mean = chroma.mean(axis=1)

    corr = _best_key_from_chroma_mean(chroma_mean)

    return {
        "tempo_bpm": float(tempo[0]),
        "rms_mean": float(rms.mean()),
        "rms_std": float(rms.std()),
        "rms_db_mean": float(rms_db.mean()),
        "rms_db_std": float(rms_db.std()),
        "mfcc_mean": mfcc_mean,
        "mfcc_std":  mfcc_std,
        "chroma_mean": chroma_mean,
        "mode_conf": corr
    }

In [None]:
def features_to_row(feats: Dict[str, object], path: str) -> dict:
    row = {
       "tempo_bpm": feats["tempo_bpm"],
        "rms_mean": feats["rms_mean"],
        "rms_std": feats["rms_std"],
        "rms_db_mean": feats["rms_db_mean"],
        "rms_db_std": feats["rms_db_std"],
        "mode_conf": feats["mode_conf"],
    }
    # MFCC mean/std
    for i, v in enumerate(np.asarray(feats["mfcc_mean"]).tolist()):
        row[f"mfcc_mean_{i}"] = float(v)
    for i, v in enumerate(np.asarray(feats["mfcc_std"]).tolist()):
        row[f"mfcc_std_{i}"] = float(v)
    # Chroma mean
    for pitch, v in zip(_PITCHES, np.asarray(feats["chroma_mean"]).tolist()):
        row[f"chroma_{pitch}"] = float(v)
    return row

In [None]:
def pipeline(args) -> pd.DataFrame:
    ensure_dir(args.outdir)

    rows = []
    mp3 = f'/content/drive/MyDrive/VAD/song_data/audio/{args.id}.mp3'

    try:
        feats = extract_features(mp3, sr_target=args.sr)
        rows.append({
            **features_to_row(feats, mp3),
            'id': args.id,
            })
        print('추출 성공')
    except Exception as e:
        print(f"[err] feature extraction failed: {e}", file=sys.stderr)

    df = pd.DataFrame(rows)

    return df

In [None]:
def main(chart, args=None):
    ap = argparse.ArgumentParser(description="Deezer 미리듣기 → MFCC/Chroma/Mode/BPM/RMS + DataFrame/CSV")
    src = ap.add_mutually_exclusive_group(required=False)
    ap.add_argument("--id", type=str)
    ap.add_argument("-o", "--outdir", default="audio")
    ap.add_argument("-n", "--limit", type=int, default=10)
    ap.add_argument("--overwrite", action="store_true")
    ap.add_argument("--sr", type=int, default=22050)
    ap.add_argument("--csv", type=str, default=None)
    ap.add_argument("--csv-encoding", type=str, default="utf-8")

    parsed, _unknown = ap.parse_known_args(args)

    dfs = []

    for idx in range(len(chart)):
        id = chart['song_id'].iloc[idx]
        print(f"\n [{idx+1}/{len(chart)}] id:{id}")

        parsed.id = id

        try:
            df = pipeline(parsed)
            if df is not None and not df.empty:
                dfs.append(df)
            else:
                print(f"[skip] {id}: 분석 결과 없음")
        except Exception as e:
            print(f"[err] {id} 실패: {e}")

        if (idx + 1) % 20 == 0 and dfs:
            written = os.path.exists('/content/drive/MyDrive/VAD/deezer_features_total.csv')
            merged = pd.concat(dfs, ignore_index=True)
            merged.to_csv("/content/drive/MyDrive/VAD/deezer_features_total.csv", mode='a', header=not written, index=False, encoding="utf-8-sig")
            print(f"[autosave] {idx+1}곡 중간 저장 완료 ({len(dfs)} DataFrames)")
            dfs.clear()

    if dfs:
        written = os.path.exists('/content/drive/MyDrive/VAD/deezer_features_total.csv')
        merged = pd.concat(dfs, ignore_index=True)
        merged.to_csv("/content/drive/MyDrive/VAD/deezer_features_total.csv", mode='a', header=not written, index=False, encoding="utf-8-sig")
        print(f"\n전체 {len(dfs)}곡 저장 완료")
        return merged
    else:
        print("\n[warn] 유효한 트랙 데이터가 없습니다.")
        return None

In [None]:
audio_feat_set = pd.read_csv('song_data/vad/static_annotations_averaged_songs_1_2000.csv')

In [None]:
main(audio_feat_set.loc)

# Emobank 번역


In [None]:
!pip install deep-translator

In [None]:
import pandas as pd
import time, os
from deep_translator import GoogleTranslator

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
save_path = '/content/drive/MyDrive/Emobank/lyric_df_translated.csv'
checkpoint_path = '/content/drive/MyDrive/Emobank/lyric_df_checkpoint.csv'

batch_size = 50
save_every = 200

# load
df = pd.read_csv('https://raw.githubusercontent.com/JULIELab/EmoBank/master/corpus/emobank.csv')

if 'trans' not in df.columns:
    df['trans'] = ''

if os.path.exists(checkpoint_path):
    df_cp = pd.read_csv(checkpoint_path)
    if len(df_cp) == len(df):
        df = df_cp
        print(f"번역된 {(df['trans'].astype(str).str.strip() != '').sum()}행")

translator = GoogleTranslator(source='auto', target='ko')

# translation
n = len(df)
for i in range(n):
    current = str(df.loc[i, 'trans']).strip()

    # skip
    if current != '' and current.lower() != 'nan':
        continue

    text = str(df.loc[i, 'text'])
    if text.strip() == '' or text.lower() == 'nan':
        df.loc[i, 'trans'] = ''
        continue

    try:
        if len(text) > 4500:
            text = text[:4500]

        df.loc[i, 'trans'] = translator.translate(text)

    except Exception as e:
        print(f"{i} 오류: {e}")
        df.loc[i, 'trans'] = ''
        time.sleep(3)
        continue

    if (i + 1) % batch_size == 0:
        done = (df['trans'].astype(str).str.strip() != '').sum()
        print(f"{i+1}/{n}행 완료")
        time.sleep(0.3)

    # auto save
    if (i + 1) % save_every == 0:
        df.to_csv(checkpoint_path, index=False)
        print(f"{i+1}행 저장")

# save
df.to_csv(save_path, index=False)
df.to_csv(checkpoint_path, index=False)
print(f"번역된 행: {(df['trans'].astype(str).str.strip() != '').sum()}개")