In [None]:
import os
import csv
import json
import numpy as np
from typing import List, Optional, Tuple
from sklearn.neighbors import NearestNeighbors
import joblib


def build_knn_from_csv(csv_path: str, vector_col: str = "genre_vector", metric: str = "cosine") -> Tuple[NearestNeighbors, np.ndarray, List[str], np.ndarray]:
    if not os.path.exists(csv_path):
        raise FileNotFoundError(csv_path)
    with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
        reader = csv.reader(f)
        rows = list(reader)
    if not rows or len(rows) < 2:
        raise ValueError("CSV has no data rows")

    header = rows[0]
    data_rows = rows[1:]

    vec_idx = _find_col_idx(header, [vector_col])
    if vec_idx is None:
        raise ValueError(f"Vector column not found: {vector_col}")

    title_idx = _find_col_idx(header, ["title", "song", "name", "track", "곡명", "제목"])
    titles: List[str] = [r[title_idx] if (title_idx is not None and title_idx < len(r)) else str(i) for i, r in enumerate(data_rows)]

    vectors: List[Optional[List[float]]] = []
    for r in data_rows:
        cell = r[vec_idx] if vec_idx < len(r) else ""
        vectors.append(_parse_json_vector(cell))

    dim = 0
    for v in vectors:
        if isinstance(v, list) and len(v) > 0:
            dim = len(v)
            break
    if dim == 0:
        raise ValueError("No valid vectors")

    M = np.zeros((len(vectors), dim), dtype=np.float32)
    valid = np.zeros((len(vectors),), dtype=bool)
    for i, v in enumerate(vectors):
        if isinstance(v, list) and len(v) == dim:
            M[i, :] = np.asarray(v, dtype=np.float32)
            valid[i] = True

    if not valid.any():
        raise ValueError("No valid rows for KNN")

    nn = NearestNeighbors(metric=metric, algorithm="brute")
    nn.fit(M[valid])
    return nn, M, titles, valid


def save_knn_model(model_path: str, nn: NearestNeighbors, matrix: np.ndarray, titles: List[str], valid_mask: np.ndarray, vector_col: str = "genre_vector") -> None:
    payload = {
        "vector_col": vector_col,
        "titles": titles,
        "valid_mask": valid_mask,
        "matrix": matrix,
        "nn_model": nn,
    }
    joblib.dump(payload, model_path)


# Example usage (adjust paths as needed)
try:
    nn_model, M_all, song_titles, valid_mask = build_knn_from_csv(CSV_PATH, vector_col="genre_vector", metric="cosine")
    MODEL_SAVE_PATH = "../models/knn_model.joblib"
    os.makedirs(os.path.dirname(MODEL_SAVE_PATH), exist_ok=True)
    save_knn_model(MODEL_SAVE_PATH, nn_model, M_all, song_titles, valid_mask, vector_col="genre_vector")
    print(f"[SAVE] KNN model saved to {MODEL_SAVE_PATH}")
except Exception as e:
    print(f"[ERROR] KNN save failed: {e}")


Genre.py는 csv파일에 있는 데이터를 기반으로 한다.
여기서 쓰이는 함수는 recommended_by_knn이다FAS
csv 파일에 있는 음악 번호를 입력으로 받아서 그 음악을 좋아하는 사람의 장르와, 좋아하는 사람의 옷 스타일 태그를 집계해서 뽑아준다.
그 음악과 비슷한 공간에 있는 이웃 3명을 선택해서 스타일을 집계한 후 추천하는 시스템

웹에서 처음 사용자가 들어가서 옷을 3개 고르면 추엄 추천해주는 장르 추천시스템으로 쓰일것
--> 여기서 사용자가 옷을 선택하면 그 데이터를 저장해서 Fashion clip의 학습 데이터로 사용할 수 있게 데모 서버를 만들어야할 것 같다.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install laion-clap torch torchaudio soundfile librosa
!pip install torch torchaudio --index-url https://download.pytorch.org/whl/cpu

Collecting laion-clap
  Downloading laion_clap-1.1.7-py3-none-any.whl.metadata (26 kB)
Collecting numpy<2.0.0,>=1.23.5 (from laion-clap)
  Downloading numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting torchlibrosa (from laion-clap)
  Downloading torchlibrosa-0.1.0-py3-none-any.whl.metadata (3.5 kB)
Collecting ftfy (from laion-clap)
  Downloading ftfy-6.3.1-py3-none-any.whl.metadata (7.3 kB)
Collecting braceexpand (from laion-clap)
  Downloading braceexpand-0.1.7-py2.py3-none-any.whl.metadata (3.0 kB)
Collecting webdataset (from laion-clap)
  Downloading webdataset-1.0.2-py3-none-any.whl.metadata (12 kB)
Collecting wget (from laion-clap)
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting progressbar (from laion-clap)
  Downloading progressbar-2.5.tar.gz (10 kB)
  Pre

Looking in indexes: https://download.pytorch.org/whl/cpu


In [3]:
# 가사 입력
import numpy as np
import torch
import torch.nn.functional as F
import librosa
import os
import csv
import re
import json
from typing import List, Optional, Tuple, Dict
from laion_clap import CLAP_Module

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

In [4]:
# CLAP 모델 로드 (기본 사전학습 모델)
clap_model = CLAP_Module(enable_fusion=False)  # fusion=True 는 multi-modal fusion용
clap_model.load_ckpt()  # 자동으로 사전학습된 ckpt 로드

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Load our best checkpoint in the paper.
Downloading laion_clap weight files...
Download completed!
Load Checkpoint...
logit_scale_a 	 Loaded
logit_scale_t 	 Loaded
audio_branch.spectrogram_extractor.stft.conv_real.weight 	 Loaded
audio_branch.spectrogram_extractor.stft.conv_imag.weight 	 Loaded
audio_branch.logmel_extractor.melW 	 Loaded
audio_branch.bn0.weight 	 Loaded
audio_branch.bn0.bias 	 Loaded
audio_branch.patch_embed.proj.weight 	 Loaded
audio_branch.patch_embed.proj.bias 	 Loaded
audio_branch.patch_embed.norm.weight 	 Loaded
audio_branch.patch_embed.norm.bias 	 Loaded
audio_branch.layers.0.blocks.0.norm1.weight 	 Loaded
audio_branch.layers.0.blocks.0.norm1.bias 	 Loaded
audio_branch.layers.0.blocks.0.attn.relative_position_bias_table 	 Loaded
audio_branch.layers.0.blocks.0.attn.qkv.weight 	 Loaded
audio_branch.layers.0.blocks.0.attn.qkv.bias 	 Loaded
audio_branch.layers.0.blocks.0.attn.proj.weight 	 Loaded
audio_branch.layers.0.blocks.0.attn.proj.bias 	 Loaded
audio_branch.laye

In [6]:
CSV_PATH = "/content/drive/MyDrive/graduateproject/data/songs_out_final.csv"
MP3_DIR = "/content/drive/MyDrive/graduateproject/data/song_mp3"
TOP_K = 4  # 곡당 추출할 상위 가사 줄 개수

In [7]:
GENRE_TAGS: List[str] = [
    "Hip-Hop", "Rap", "Rock", "Alternative Rock", "Punk",
    "Pop", "Indie Pop", "R&B", "Soul", "Jazz", "Classical",
    "Electronic", "Ambient", "Folk", "Acoustic",
    "Reggae", "Latin", "Country", "Blues", "Experimental"
]

MOOD_TAGS: List[str] = [
    "Happy", "Sad", "Energetic", "Chill", "Relaxed", "Dark", "Romantic",
    "Uplifting", "Melancholic", "Angry", "Nostalgic", "Dreamy",
    "Hypnotic", "Mysterious", "Playful", "Aggressive"
]

TEXTURE_TAGS: List[str] = [
    "Lo-fi", "Clean", "Distorted", "Warm", "Bright", "Harsh", "Smooth",
    "Gritty", "Metallic", "Acoustic", "Synthetic", "Dry", "Wet",
    "Punchy", "Muffled"
]

ALL_TAGS: List[str] = GENRE_TAGS + MOOD_TAGS + TEXTURE_TAGS

# 태그 → 카테고리 매핑
TAG_CATEGORY_MAP: Dict[str, str] = {}
for t in GENRE_TAGS:
    TAG_CATEGORY_MAP[t] = "genre"
for t in MOOD_TAGS:
    TAG_CATEGORY_MAP[t] = "mood"
for t in TEXTURE_TAGS:
    TAG_CATEGORY_MAP[t] = "texture"

# 텍스트 임베딩 (문장 프롬프트를 통해 정확도 향상)
TAG_PROMPTS: List[str] = [f"A {t} music" for t in ALL_TAGS]
TEXT_EMBEDDINGS: torch.Tensor = clap_model.get_text_embedding(TAG_PROMPTS, use_tensor=True)

# 카테고리 인덱스 (ALL_TAGS에서의 위치)
GENRE_IDX: List[int] = [ALL_TAGS.index(t) for t in GENRE_TAGS]
MOOD_IDX: List[int] = [ALL_TAGS.index(t) for t in MOOD_TAGS]
TEXTURE_IDX: List[int] = [ALL_TAGS.index(t) for t in TEXTURE_TAGS]

In [9]:
def get_audio_embedding_from_path(path: str) -> torch.Tensor:
    # Load and resample audio
    audio_data, _ = librosa.load(path, sr=48000)
    audio_data = audio_data.reshape(1, -1)  # [1, T]

    # 양자화 (quantization)
    def int16_to_float32(x: np.ndarray) -> np.ndarray:
        return (x / 32767.0).astype(np.float32)

    def float32_to_int16(x: np.ndarray) -> np.ndarray:
        x = np.clip(x, a_min=-1., a_max=1.)
        return (x * 32767.).astype(np.int16)

    audio_data = torch.from_numpy(int16_to_float32(float32_to_int16(audio_data))).float()
    return clap_model.get_audio_embedding_from_data(x=audio_data, use_tensor=True)



In [10]:
def get_top_k_tags_by_category(audio_embedding: torch.Tensor, top_k: int = 4) -> Dict[str, List[str]]:
    """오디오 임베딩과 사전 계산된 TEXT_EMBEDDINGS의 코사인 유사도로
    장르/무드/텍스처 카테고리별 상위 top_k 태그를 반환합니다.

    Returns: { "genre": [...], "mood": [...], "texture": [...] }
    """
    sims = F.cosine_similarity(audio_embedding, TEXT_EMBEDDINGS)  # [N_tags]

    # 카테고리별 (tag, score) 수집
    scores_by_cat: Dict[str, List[Tuple[str, float]]] = {"genre": [], "mood": [], "texture": []}
    for idx, score in enumerate(sims):
        tag = ALL_TAGS[idx]
        cat = TAG_CATEGORY_MAP.get(tag, "")
        if cat in scores_by_cat:
            scores_by_cat[cat].append((tag, float(score.item())))

    # 각 카테고리 상위 top_k 추출
    top_by_cat: Dict[str, List[str]] = {}
    for cat, pairs in scores_by_cat.items():
        pairs.sort(key=lambda x: x[1], reverse=True)
        top_by_cat[cat] = [t for t, _ in pairs[:max(0, top_k)]]
    return top_by_cat


def compute_category_tags_for_audio_path(audio_path: str, top_k_per_category: int = 4) -> Tuple[List[str], List[str], List[str]]:
    """오디오 파일 경로에서 카테고리별 태그(장르/무드/텍스처)를 계산합니다."""
    audio_emb = get_audio_embedding_from_path(audio_path)
    top_dict = get_top_k_tags_by_category(audio_emb, top_k=top_k_per_category)
    return top_dict.get("genre", []), top_dict.get("mood", []), top_dict.get("texture", [])


def get_category_score_vectors(audio_embedding: torch.Tensor) -> Tuple[List[float], List[float], List[float]]:
    """카테고리별 유사도 벡터를 반환합니다. 각 벡터의 차원은 해당 태그 배열 크기와 동일합니다.

    Returns:
        (genre_scores, mood_scores, texture_scores)
    """
    sims = F.cosine_similarity(audio_embedding, TEXT_EMBEDDINGS)  # [N_tags]
    genre_scores = [float(sims[i].item()) for i in GENRE_IDX]
    mood_scores = [float(sims[i].item()) for i in MOOD_IDX]
    texture_scores = [float(sims[i].item()) for i in TEXTURE_IDX]
    return genre_scores, mood_scores, texture_scores


def write_category_tags_to_csv(
    csv_path: str,
    mp3_dir: str,
    genre_col: str = "genre_tags",
    mood_col: str = "mood_tags",
    texture_col: str = "texture_tags",
    genre_vec_col: str = "genre_vector",
    mood_vec_col: str = "mood_vector",
    texture_vec_col: str = "texture_vector",
    delimiter: str = " | ",
    top_k_per_category: int = 4,
    store_vectors: bool = True,
) -> None:
    """MP3 디렉터리의 파일 순서와 CSV 데이터 행 순서를 맞춰
    각 곡의 장르/무드/텍스처 태그를 계산하여 CSV에 열로 기록합니다.
    열이 없으면 생성합니다.
    """
    if not os.path.exists(csv_path):
        print(f"[WARN] CSV가 없습니다: {csv_path}")
        return
    mp3_files = list_mp3_files_sorted(mp3_dir)

    # CSV 전체 로드
    with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
        reader = csv.reader(f)
        rows = list(reader)
    if not rows:
        print("[WARN] 빈 CSV")
        return

    header = rows[0]
    data_rows = rows[1:]

    # 열 인덱스 확보 또는 생성
    lower = [c.lower() if isinstance(c, str) else c for c in header]
    def ensure_col(col_name: str) -> int:
        try:
            return lower.index(col_name.lower())
        except ValueError:
            header.append(col_name)
            lower.append(col_name.lower())
            for r in data_rows:
                r.extend([""])
            return len(header) - 1

    genre_idx = ensure_col(genre_col)
    mood_idx = ensure_col(mood_col)
    texture_idx = ensure_col(texture_col)
    if store_vectors:
        genre_vec_idx = ensure_col(genre_vec_col)
        mood_vec_idx = ensure_col(mood_vec_col)
        texture_vec_idx = ensure_col(texture_vec_col)

    n = min(len(mp3_files), len(data_rows))
    if n == 0:
        print("[WARN] 처리할 MP3 또는 데이터 행이 없습니다.")
        return

    for i in range(n):
        mp3_path = mp3_files[i]
        try:
            g_tags, m_tags, t_tags = compute_category_tags_for_audio_path(mp3_path, top_k_per_category)
            if store_vectors:
                audio_emb = get_audio_embedding_from_path(mp3_path)
                g_vec, m_vec, t_vec = get_category_score_vectors(audio_emb)
        except Exception as e:
            print(f"[SKIP] #{i+1}: {os.path.basename(mp3_path)} 오류: {e}")
            g_tags, m_tags, t_tags = [], [], []
            if store_vectors:
                g_vec, m_vec, t_vec = [], [], []

        row = data_rows[i]
        # row 길이가 부족할 수 있으니 보정
        max_base = max(genre_idx, mood_idx, texture_idx)
        if store_vectors:
            max_base = max(max_base, genre_vec_idx, mood_vec_idx, texture_vec_idx)
        if max_base >= len(row):
            row.extend([""] * (max_base - len(row) + 1))
        row[genre_idx] = delimiter.join(g_tags)
        row[mood_idx] = delimiter.join(m_tags)
        row[texture_idx] = delimiter.join(t_tags)
        if store_vectors:
            row[genre_vec_idx] = json.dumps(g_vec, ensure_ascii=False)
            row[mood_vec_idx] = json.dumps(m_vec, ensure_ascii=False)
            row[texture_vec_idx] = json.dumps(t_vec, ensure_ascii=False)
        print(f"[TAGS] #{i+1}: {os.path.basename(mp3_path)} → G:{len(g_tags)} M:{len(m_tags)} T:{len(t_tags)}" + (" + vectors" if store_vectors else ""))

    # 저장
    with open(csv_path, "w", encoding="utf-8-sig", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(data_rows)
    print(f"[WRITE] 태그 열 업데이트 완료 → {csv_path}")


def natural_key(s: str) -> List:
    # 숫자를 자연스럽게 정렬하기 위한 키
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r"(\d+)", s)]


def list_mp3_files_sorted(mp3_dir: str) -> List[str]:
    if not os.path.isdir(mp3_dir):
        return []
    files = [os.path.join(mp3_dir, f) for f in os.listdir(mp3_dir) if f.lower().endswith(".mp3")]
    files.sort(key=lambda p: natural_key(os.path.basename(p)))
    return files


def read_lyrics_from_csv(csv_path: str) -> List[str]:
    lyrics_list: List[str] = []
    if not os.path.exists(csv_path):
        return lyrics_list
    with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
        reader = csv.reader(f)
        first = next(reader, None)
        if first is None:
            return lyrics_list

        # 헤더 여부 판별 및 가사 열 인덱스 탐색
        header_candidates = ["lyrics", "lyric", "가사", "text", "content"]
        has_header = any(isinstance(x, str) and x.strip() for x in first)
        lyric_col_idx: Optional[int] = None

        # 헤더로 판단되면 이름 기반 탐색
        if has_header and any(h.lower() in [c.lower() for c in first if isinstance(c, str)] for h in header_candidates):
            header = first
            lower = [c.lower() if isinstance(c, str) else c for c in header]
            for name in header_candidates:
                try:
                    j = lower.index(name.lower())
                    lyric_col_idx = j
                    break
                except ValueError:
                    continue
            rows_iter = reader
        else:
            # 헤더 없다고 보고 첫 행도 데이터로 포함, 가사는 마지막 열로 가정 시도
            lyric_col_idx = len(first) - 1 if len(first) > 0 else None
            rows_iter = [first] + list(reader)

        for row in rows_iter:
            if lyric_col_idx is None or lyric_col_idx >= len(row):
                lyrics_list.append("")
                continue
            val = row[lyric_col_idx]
            lyrics_list.append(val if isinstance(val, str) else str(val))
    return lyrics_list


def split_lyrics_to_lines(lyric_text: str) -> List[str]:
    return [line.strip() for line in (lyric_text or "").split("\n") if line and line.strip()]


def extract_top_lines_for_audio(lyric_text: str, audio_path: str, top_k: int = 10) -> List[str]:
    lines = split_lyrics_to_lines(lyric_text)
    if not lines:
        return []
    text_embs = clap_model.get_text_embedding(lines, use_tensor=True)  # [N, D]
    audio_emb = get_audio_embedding_from_path(audio_path)  # [1, D]
    sims = F.cosine_similarity(audio_emb, text_embs)  # [N]
    sorted_indices = torch.argsort(sims, descending=True)
    top_lines: List[str] = []
    seen: set = set()
    for idx in sorted_indices:
        line = lines[int(idx.item())]
        if line in seen:
            continue
        top_lines.append(line)
        seen.add(line)
        if len(top_lines) >= top_k:
            break
    return top_lines

def write_summary_to_csv(csv_path: str, tags_list: List[List[str]], summary_col_candidates: Optional[List[str]] = None, delimiter: str = " | ") -> None:
    """tags_list를 CSV의 summary(요약) 열에 씁니다. 열이 없으면 생성.

    - summary 열 후보명: 기본 ["summary", "summary_c(mood_out)", "summary_c", "요약"]
    - tags_list 길이는 CSV 데이터 행 수(헤더 제외) 이상이면 행 수만큼만 사용
    - 각 곡의 top 라인들을 delimiter로 연결해 기록
    """
    if summary_col_candidates is None:
        summary_col_candidates = ["summary", "summary_c(mood_out)", "summary_c", "요약"]

    if not os.path.exists(csv_path):
        print(f"[WARN] CSV가 없습니다: {csv_path}")
        return

    # CSV 전체 로드
    with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
        reader = csv.reader(f)
        rows = list(reader)
    if not rows:
        print("[WARN] 빈 CSV")
        return

    header = rows[0]
    data_rows = rows[1:]

    # summary 열 탐색 또는 생성
    lower = [c.lower() if isinstance(c, str) else c for c in header]
    summary_idx: Optional[int] = None
    for cand in summary_col_candidates:
        try:
            j = lower.index(cand.lower())
            summary_idx = j
            break
        except ValueError:
            continue
    if summary_idx is None:
        header.append(summary_col_candidates[0])
        summary_idx = len(header) - 1
        for r in data_rows:
            r.extend([""])

    # 데이터 행에 tags_list 반영
    limit = min(len(data_rows), len(tags_list))
    for i in range(limit):
        summary_text = delimiter.join(tags_list[i]) if tags_list[i] else ""
        row = data_rows[i]
        # row 길이가 부족할 수 있으니 보정
        if summary_idx >= len(row):
            row.extend([""] * (summary_idx - len(row) + 1))
        row[summary_idx] = summary_text

    # 다시 저장
    with open(csv_path, "w", encoding="utf-8-sig", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(data_rows)
    print(f"[WRITE] 요약 열 업데이트 완료 → {csv_path}")


In [11]:
def _find_col_idx(header: List[str], candidates: List[str]) -> Optional[int]:
    lower = [c.lower() if isinstance(c, str) else c for c in header]
    for cand in candidates:
        try:
            return lower.index(cand.lower())
        except ValueError:
            continue
    return None


In [12]:
def _find_col_indices(header: List[str], candidates: List[str]) -> List[int]:
    """Return indices for all columns whose names match any in candidates (case-insensitive)."""
    lower = [c.lower() if isinstance(c, str) else c for c in header]
    wanted = {c.lower() for c in candidates}
    indices: List[int] = []
    for i, name in enumerate(lower):
        if isinstance(name, str) and name in wanted:
            indices.append(i)
    return indices



In [14]:
def _parse_json_vector(cell: str) -> Optional[List[float]]:
    if not isinstance(cell, str) or not cell.strip():
        return None
    try:
        v = json.loads(cell)
        if isinstance(v, list) and all(isinstance(x, (int, float)) for x in v):
            return [float(x) for x in v]
    except Exception:
        return None
    return None


def _resolve_seed_to_index(seed: object, titles: List[str]) -> Optional[int]:
    # Numeric index support (both 0-based and 1-based attempts)
    try:
        idx = int(seed)
        if 0 <= idx < len(titles):
            return idx
        if 1 <= idx <= len(titles):
            return idx - 1
    except Exception:
        pass
    # Title matching (case-insensitive, stripped)
    if isinstance(seed, str):
        target = seed.strip().lower()
        for i, t in enumerate(titles):
            if isinstance(t, str) and t.strip().lower() == target:
                return i
    return None

def recommend_by_knn(
    csv_path: str,
    seeds: List[object],
    k_neighbors: int = 3,
    per_seed_top: int = 3,
    final_top: int = 3,
    vector_col: str = "genre_vector",
    label_col_candidates: Optional[List[str]] = None,
    title_col_candidates: Optional[List[str]] = None,
) -> Dict[str, object]:
    """
    - 입력 seeds: 곡 제목(문자열) 또는 번호(정수/문자열 숫자) 3개
    - 각 seed마다 genre_vector 기반 코사인 유사도 Top-k 이웃을 찾고,
      이웃들의 라벨(예: 옷 스타일) 분포에서 상위 per_seed_top을 뽑음
    - 3개의 seed 결과(최대 9개 라벨)을 다시 통계내어 상위 final_top을 반환

    Returns dict with keys:
      "per_seed_top_labels": List[List[str]]
      "final_top_labels": List[str]
      "neighbors_indices": List[List[int]]  # per seed neighbor row indices
    """
    if label_col_candidates is None:
        # 기본적으로 CSV의 스타일 열 3종을 우선 사용
        label_col_candidates = [
            "style_first", "style_second", "style_third",
            "outfit_style", "style", "clothes_style", "cloth_style",
            "패션", "스타일", "outfit"
        ]
    if title_col_candidates is None:
        title_col_candidates = ["title", "song", "name", "track", "곡명", "제목"]

    # Load CSV
    if not os.path.exists(csv_path):
        raise FileNotFoundError(csv_path)
    with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
        reader = csv.reader(f)
        rows = list(reader)
    if not rows:
        raise ValueError("빈 CSV")

    header = rows[0]
    data_rows = rows[1:]
    if not data_rows:
        raise ValueError("데이터 행이 없습니다")

    vec_idx = _find_col_idx(header, [vector_col])
    if vec_idx is None:
        raise ValueError(f"벡터 열을 찾을 수 없습니다: {vector_col}")

    title_idx = _find_col_idx(header, title_col_candidates)
    titles = [r[title_idx] if (title_idx is not None and title_idx < len(r)) else str(i) for i, r in enumerate(data_rows)]

    # 다중 라벨 열 지원 (예: style_first/second/third)
    label_indices = _find_col_indices(header, label_col_candidates)
    # 장르 태그 열 (옵션)
    genre_idx_opt = _find_col_idx(header, ["genre_tags"])  # 없을 수 있음

    # Build matrix of vectors
    vectors: List[Optional[List[float]]] = []
    for r in data_rows:
        cell = r[vec_idx] if vec_idx < len(r) else ""
        vectors.append(_parse_json_vector(cell))
    # Determine dimension
    dim = 0
    for v in vectors:
        if isinstance(v, list) and len(v) > 0:
            dim = len(v)
            break
    if dim == 0:
        raise ValueError("유효한 벡터가 없습니다")

    import numpy as _np
    M = _np.zeros((len(vectors), dim), dtype=_np.float32)
    valid = _np.zeros((len(vectors),), dtype=_np.bool_)
    for i, v in enumerate(vectors):
        if isinstance(v, list) and len(v) == dim:
            M[i, :] = _np.array(v, dtype=_np.float32)
            valid[i] = True

    def _cosine_sim_matrix_row(mat: _np.ndarray, idx: int) -> _np.ndarray:
        q = mat[idx]
        q_norm = _np.linalg.norm(q)
        if q_norm == 0:
            return _np.full((mat.shape[0],), -_np.inf, dtype=_np.float32)
        dot = mat @ q
        mat_norms = _np.linalg.norm(mat, axis=1)
        denom = (mat_norms * q_norm)
        with _np.errstate(divide='ignore', invalid='ignore'):
            sims = _np.where(denom > 0, dot / denom, -_np.inf)
        return sims

    # Resolve seeds → indices
    if len(seeds) != 3:
        raise ValueError("seeds는 3개여야 합니다")
    seed_indices: List[int] = []
    for s in seeds:
        idx = _resolve_seed_to_index(s, titles)
        if idx is None:
            raise ValueError(f"seed를 찾을 수 없습니다: {s}")
        seed_indices.append(idx)
    seed_titles: List[str] = [titles[i] for i in seed_indices]

    per_seed_neighbors: List[List[int]] = []
    per_seed_neighbor_titles: List[List[str]] = []
    per_seed_top_labels: List[List[str]] = []
    per_seed_top_genres: List[List[str]] = []
    global_counts: Dict[str, int] = {}
    global_genre_counts: Dict[str, int] = {}

    for si in seed_indices:
        if not valid[si]:
            per_seed_neighbors.append([])
            per_seed_top_labels.append([])
            continue
        sims = _cosine_sim_matrix_row(M, si)
        sims[si] = -_np.inf  # exclude self
        # sort indices by sims desc
        nn_idx = _np.argsort(-sims)
        # keep only valid rows
        nn_idx = [int(i) for i in nn_idx if valid[int(i)]]
        nn_idx = nn_idx[:max(0, k_neighbors)]
        per_seed_neighbors.append(nn_idx)
        per_seed_neighbor_titles.append([titles[ni] for ni in nn_idx])

        # Aggregate labels among neighbors (from multiple label columns if present)
        counts: Dict[str, int] = {}
        genre_counts: Dict[str, int] = {}
        for ni in nn_idx:
            if not label_indices:
                pass
            row = data_rows[ni]
            for li in label_indices:
                if li >= len(row):
                    continue
                cell = row[li]
                if not isinstance(cell, str) or not cell.strip():
                    continue
                parts = re.split(r"\s*[|,]\s*", cell.strip())
                for p in parts:
                    if not p:
                        continue
                    counts[p] = counts.get(p, 0) + 1
            # genre aggregation
            if genre_idx_opt is not None and genre_idx_opt < len(row):
                gcell = row[genre_idx_opt]
                if isinstance(gcell, str) and gcell.strip():
                    gparts = re.split(r"\s*[|,]\s*", gcell.strip())
                    for gp in gparts:
                        if not gp:
                            continue
                        genre_counts[gp] = genre_counts.get(gp, 0) + 1

        # top labels for this seed
        top_items = sorted(counts.items(), key=lambda x: (-x[1], x[0]))[:max(0, per_seed_top)]
        top_labels = [k for k, _ in top_items]
        per_seed_top_labels.append(top_labels)
        for k in top_labels:
            global_counts[k] = global_counts.get(k, 0) + 1
        # top genres for this seed
        top_g_items = sorted(genre_counts.items(), key=lambda x: (-x[1], x[0]))[:max(0, per_seed_top)]
        top_genres = [k for k, _ in top_g_items]
        per_seed_top_genres.append(top_genres)
        for k in top_genres:
            global_genre_counts[k] = global_genre_counts.get(k, 0) + 1

    final_top_labels = [k for k, _ in sorted(global_counts.items(), key=lambda x: (-x[1], x[0]))[:max(0, final_top)]]
    final_top_genres = [k for k, _ in sorted(global_genre_counts.items(), key=lambda x: (-x[1], x[0]))[:max(0, final_top)]]

    return {
        "seed_titles": seed_titles,
        "per_seed_top_labels": per_seed_top_labels,
        "per_seed_top_genres": per_seed_top_genres,
        "final_top_labels": final_top_labels,
        "final_top_genres": final_top_genres,
        "neighbors_indices": per_seed_neighbors,
        "per_seed_neighbor_titles": per_seed_neighbor_titles,
    }


def recommend_by_knn_multi_vectors(
    csv_path: str,
    seeds: List[object],
    k_neighbors: int = 3,
    per_seed_top: int = 3,
    final_top: int = 3,
    vector_cols: Optional[List[str]] = None,
    label_col_candidates: Optional[List[str]] = None,
    title_col_candidates: Optional[List[str]] = None,
) -> Dict[str, object]:
    """
    장르/무드/텍스쳐 벡터를 모두 사용하여 추천을 집계합니다.

    - 각 seed마다 vector_cols 내 각 벡터 컬럼에서 k개의 최근접 이웃을 구하고
      (코사인 유사도, 자기 자신 제외), 이웃들의 라벨(옷 스타일 등)을 집계하여
      시드별 상위 per_seed_top 라벨을 고릅니다.
    - 모든 시드의 결과(최대 3*per_seed_top)에서 다시 집계하여 최종 상위 final_top 라벨을 반환합니다.

    Returns dict with keys:
      "per_seed_top_labels": List[List[str]]
      "final_top_labels": List[str]
      "neighbors_indices": List[List[int]]  # per seed, combined neighbors (with possible duplicates)
    """
    if vector_cols is None:
        vector_cols = ["genre_vector", "mood_vector", "texture_vector"]
    if label_col_candidates is None:
        label_col_candidates = [
            "style_first", "style_second", "style_third",
            "outfit_style", "style", "clothes_style", "cloth_style",
            "패션", "스타일", "outfit"
        ]
    if title_col_candidates is None:
        title_col_candidates = ["title", "song", "name", "track", "곡명", "제목"]

    # Load CSV
    if not os.path.exists(csv_path):
        raise FileNotFoundError(csv_path)
    with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
        reader = csv.reader(f)
        rows = list(reader)
    if not rows:
        raise ValueError("빈 CSV")

    header = rows[0]
    data_rows = rows[1:]
    if not data_rows:
        raise ValueError("데이터 행이 없습니다")

    # Column indices
    vec_indices: List[int] = []
    for col in vector_cols:
        idx = _find_col_idx(header, [col])
        if idx is not None:
            vec_indices.append(idx)
    if not vec_indices:
        raise ValueError(f"벡터 열을 찾을 수 없습니다: {vector_cols}")

    title_idx = _find_col_idx(header, title_col_candidates)
    titles = [r[title_idx] if (title_idx is not None and title_idx < len(r)) else str(i) for i, r in enumerate(data_rows)]

    label_indices = _find_col_indices(header, label_col_candidates)
    # 장르 태그 열 (옵션)
    genre_idx_opt = _find_col_idx(header, ["genre_tags"])  # 없을 수 있음

    # Build matrices per vector column
    import numpy as _np
    matrices: List[_np.ndarray] = []
    valids: List[_np.ndarray] = []
    dims: List[int] = []

    for vi in vec_indices:
        vectors: List[Optional[List[float]]] = []
        for r in data_rows:
            cell = r[vi] if vi < len(r) else ""
            vectors.append(_parse_json_vector(cell))
        dim = 0
        for v in vectors:
            if isinstance(v, list) and len(v) > 0:
                dim = len(v)
                break
        if dim == 0:
            # 빈 열이면 스킵
            continue
        M = _np.zeros((len(vectors), dim), dtype=_np.float32)
        valid = _np.zeros((len(vectors),), dtype=_np.bool_)
        for i, v in enumerate(vectors):
            if isinstance(v, list) and len(v) == dim:
                M[i, :] = _np.array(v, dtype=_np.float32)
                valid[i] = True
        matrices.append(M)
        valids.append(valid)
        dims.append(dim)

    if not matrices:
        raise ValueError("유효한 벡터 행렬이 없습니다")

    def _cosine_sim_matrix_row(mat: _np.ndarray, idx: int) -> _np.ndarray:
        q = mat[idx]
        q_norm = _np.linalg.norm(q)
        if q_norm == 0:
            return _np.full((mat.shape[0],), -_np.inf, dtype=_np.float32)
        dot = mat @ q
        mat_norms = _np.linalg.norm(mat, axis=1)
        denom = (mat_norms * q_norm)
        with _np.errstate(divide='ignore', invalid='ignore'):
            sims = _np.where(denom > 0, dot / denom, -_np.inf)
        return sims

    # Resolve seeds → indices
    if len(seeds) != 3:
        raise ValueError("seeds는 3개여야 합니다")
    seed_indices: List[int] = []
    for s in seeds:
        idx = _resolve_seed_to_index(s, titles)
        if idx is None:
            raise ValueError(f"seed를 찾을 수 없습니다: {s}")
        seed_indices.append(idx)
    seed_titles: List[str] = [titles[i] for i in seed_indices]

    per_seed_neighbors: List[List[int]] = []
    per_seed_neighbor_titles: List[List[str]] = []
    per_seed_top_labels: List[List[str]] = []
    per_seed_top_genres: List[List[str]] = []
    global_counts: Dict[str, int] = {}
    global_genre_counts: Dict[str, int] = {}

    for si in seed_indices:
        combined_neighbors: List[int] = []
        for mat, valid in zip(matrices, valids):
            if si >= len(valid) or not valid[si]:
                continue
            sims = _cosine_sim_matrix_row(mat, si)
            if si < sims.shape[0]:
                sims[si] = -_np.inf  # exclude self
            nn_idx = _np.argsort(-sims)
            nn_idx = [int(i) for i in nn_idx if valid[int(i)]]
            nn_idx = nn_idx[:max(0, k_neighbors)]
            combined_neighbors.extend(nn_idx)
        per_seed_neighbors.append(combined_neighbors)
        per_seed_neighbor_titles.append([titles[ni] for ni in combined_neighbors])

        # Aggregate labels among combined neighbors (duplicates count multiple times)
        counts: Dict[str, int] = {}
        genre_counts: Dict[str, int] = {}
        for ni in combined_neighbors:
            if not label_indices:
                pass
            row = data_rows[ni]
            for li in label_indices:
                if li >= len(row):
                    continue
                cell = row[li]
                if not isinstance(cell, str) or not cell.strip():
                    continue
                parts = re.split(r"\s*[|,]\s*", cell.strip())
                for p in parts:
                    if not p:
                        continue
                    counts[p] = counts.get(p, 0) + 1
            # genre aggregation
            if genre_idx_opt is not None and genre_idx_opt < len(row):
                gcell = row[genre_idx_opt]
                if isinstance(gcell, str) and gcell.strip():
                    gparts = re.split(r"\s*[|,]\s*", gcell.strip())
                    for gp in gparts:
                        if not gp:
                            continue
                        genre_counts[gp] = genre_counts.get(gp, 0) + 1

        top_items = sorted(counts.items(), key=lambda x: (-x[1], x[0]))[:max(0, per_seed_top)]
        top_labels = [k for k, _ in top_items]
        per_seed_top_labels.append(top_labels)
        for k in top_labels:
            global_counts[k] = global_counts.get(k, 0) + 1
        # top genres for this seed
        top_g_items = sorted(genre_counts.items(), key=lambda x: (-x[1], x[0]))[:max(0, per_seed_top)]
        top_genres = [k for k, _ in top_g_items]
        per_seed_top_genres.append(top_genres)
        for k in top_genres:
            global_genre_counts[k] = global_genre_counts.get(k, 0) + 1

    final_top_labels = [k for k, _ in sorted(global_counts.items(), key=lambda x: (-x[1], x[0]))[:max(0, final_top)]]
    final_top_genres = [k for k, _ in sorted(global_genre_counts.items(), key=lambda x: (-x[1], x[0]))[:max(0, final_top)]]

    return {
        "seed_titles": seed_titles,
        "per_seed_top_labels": per_seed_top_labels,
        "per_seed_top_genres": per_seed_top_genres,
        "final_top_labels": final_top_labels,
        "final_top_genres": final_top_genres,
        "neighbors_indices": per_seed_neighbors,
        "per_seed_neighbor_titles": per_seed_neighbor_titles,
    }

In [19]:
if __name__ == "__main__":
    import argparse
    import sys

    parser = argparse.ArgumentParser(description="k-NN recommendation using genre/mood/texture vectors")
    parser.add_argument("--csv_path", default=CSV_PATH)
    parser.add_argument("--seeds", nargs="+", required=False, default=None, help="3 song titles or indices (0/1-based)")
    parser.add_argument("--k_neighbors", type=int, default=3)
    parser.add_argument("--per_seed_top", type=int, default=3)
    parser.add_argument("--final_top", type=int, default=3)
    parser.add_argument(
        "--vector_cols", nargs="+", default=["genre_vector", "mood_vector", "texture_vector"],
        help="Vector columns to use (default: genre/mood/texture)"
    )
    parser.add_argument("--label_cols", nargs="+", default=None, help="Label columns to aggregate (e.g., outfit_style)")
    parser.add_argument("--title_cols", nargs="+", default=None, help="Title columns (e.g., title song name track)")
    args, _unknown = parser.parse_known_args()

    seeds_arg = args.seeds if args.seeds is not None else []
    if len(seeds_arg) != 3:
        try:
            get_ipython  # type: ignore  # Colab/IPython 환경이면 기본값 사용
            print("[INFO] --seeds 미지정: Colab/IPython 환경으로 감지되어 기본 [1, 2, 3] 사용")
            seeds_arg = ["30", "50", "60"]
        except NameError:
            print("[ERROR] --seeds 는 정확히 3개를 지정해야 합니다. 예) --seeds 1 2 3")
            sys.exit(2)

    parsed_seeds = []
    for s in seeds_arg:
        try:
            parsed_seeds.append(int(s))
        except Exception:
            parsed_seeds.append(s)

    result = recommend_by_knn_multi_vectors(
        csv_path=args.csv_path,
        seeds=[15,22,36],
        k_neighbors=args.k_neighbors,
        per_seed_top=args.per_seed_top,
        final_top=args.final_top,
        vector_cols=args.vector_cols,
        label_col_candidates=args.label_cols,
        title_col_candidates=args.title_cols,
    )

    seed_titles = result.get("seed_titles", [])
    neighbor_titles = result.get("per_seed_neighbor_titles", [])
    per_seed_labels = result.get("per_seed_top_labels", [])
    per_seed_genres = result.get("per_seed_top_genres", [])
    final_labels = result.get("final_top_labels", [])
    final_genres = result.get("final_top_genres", [])

    print("[RESULT] Seeds:")
    for i, st in enumerate(seed_titles, 1):
        print(f"  Seed#{i} title: {st}")

    print("[RESULT] per-seed neighbors (titles):")
    for i, nt in enumerate(neighbor_titles, 1):
        print(f"  Seed#{i} neighbors: {nt}")

    print("[RESULT] per-seed top styles:")
    for i, labels in enumerate(per_seed_labels, 1):
        print(f"  Seed#{i}: {labels}")

    if per_seed_genres:
        print("[RESULT] per-seed top genres:")
        for i, genres in enumerate(per_seed_genres, 1):
            print(f"  Seed#{i}: {genres}")

    print(f"[RESULT] final top styles: {final_labels}")
    if final_genres:
        print(f"[RESULT] final top genres: {final_genres}")

[INFO] --seeds 미지정: Colab/IPython 환경으로 감지되어 기본 [1, 2, 3] 사용
[RESULT] Seeds:
  Seed#1 title: toxic till the end
  Seed#2 title: 끝
  Seed#3 title: 우리들의 블루스
[RESULT] per-seed neighbors (titles):
  Seed#1 neighbors: ['No Choice', '눈물참기', 'EXTRA', 'CaNdY LiEs', 'Magic Shop', 'Different Lives', '챔피언 (Feat. 정인)', '씨스루 (Feat. Zion.T, 개코 Of 다이나믹듀오)', 'FAMOUS']
  Seed#2 neighbors: ['귀로', '서쪽 하늘', '마마', '마마', '귀로', '처음처럼', '귀로', '서쪽 하늘', 'I Fall In Love Too Easily']
  Seed#3 neighbors: ['등대', '서툰 이별을 하려해 (Feat.전상근)', 'Do You Hear What I Hear', '...사랑했잖아...(2024)', '우리 왜 헤어져야 해 (여름날 우리 X 전상근)', 'my story', '서툰 이별을 하려해 (Feat.전상근)', '우리 왜 헤어져야 해 (여름날 우리 X 전상근)', '등대']
[RESULT] per-seed top styles:
  Seed#1: ['미니멀 / Minimal Fashion', '캐주얼 / Casual', '스트릿 / Street Fashion']
  Seed#2: ['캐주얼 / Casual', '시크 / Chic Fashion', '미니멀 / Minimal Fashion']
  Seed#3: ['캐주얼 / Casual', '스트릿 / Street Fashion', '미니멀 / Minimal Fashion']
[RESULT] per-seed top genres:
  Seed#1: ['Indie Pop', 'Pop', 'Soul']
  Seed#2: ['A

In [None]:
!python main.py --seeds 1 2 3

python3: can't open file '/content/main.py': [Errno 2] No such file or directory


[TAGS] #1: 001 - JAY-Z - NEW YORK (CONCEPT DE PARIS) (Feat. Gil Scott-Heron).mp3 → G:4 M:4 T:4 + vectors
[TAGS] #2: 002 - Lady Gaga - Die With A Smile.mp3 → G:4 M:4 T:4 + vectors
[TAGS] #3: 003 - Fly By Midnight - Different Lives.mp3 → G:4 M:4 T:4 + vectors
[TAGS] #4: 004 - BOYNEXTDOOR - 오늘만 I LOVE YOU.mp3 → G:4 M:4 T:4 + vectors
[TAGS] #5: 005 - Crush - 가끔.mp3 → G:4 M:4 T:4 + vectors
[TAGS] #6: 006 - syudou - Call Boy.mp3 → G:4 M:4 T:4 + vectors
[TAGS] #7: 007 - 【初音ミク】コールボーイ【syudou】.mp3 → G:4 M:4 T:4 + vectors
[TAGS] #8: 008 - Medusa - 뱀(Snake).mp3 → G:4 M:4 T:4 + vectors
[TAGS] #9: 009 - Motorhead - The Chase Is Better Than the Catch (Live at Sporthalle, Heilbronn, 29th December 1984).mp3 → G:4 M:4 T:4 + vectors
[TAGS] #10: 010 - Yuuri - BETELGEUSE.mp3 → G:4 M:4 T:4 + vectors
[TAGS] #11: 011 - jazzyfact - vibra.mp3 → G:4 M:4 T:4 + vectors
[TAGS] #12: 012 - 조째즈 - 모르시나요(PROD.로코베리).mp3 → G:4 M:4 T:4 + vectors
[TAGS] #13: 013 - 창모 (CHANGMO) - PURE RAGE (Remix) (