In [None]:
from pymongo import MongoClient
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

# 1. 유저 정보 불러오기
def load_users(uri):
    client = MongoClient(uri)
    db = client["user"]
    users = []

    for collection_name in db.list_collection_names():
        collection = db[collection_name]
        liked_songs = [doc["title"] for doc in collection.find({}, {"_id": 0}) if "title" in doc]
        users.append({"user_id": collection_name, "liked_songs": liked_songs})

    return users

# 2. 전체 곡 정보 불러오기
def load_all_songs(uri, db_name):
    client = MongoClient(uri)
    db = client[db_name]
    all_songs = []

    for collection_name in db.list_collection_names():
        collection = db[collection_name]
        songs = list(collection.find({}, {"_id": 0}))
        all_songs.extend(songs)

    return all_songs

# 3. 곡별 장르 벡터 생성
def make_song_vectors(songs):
    genres = sorted({genre.strip().lower() for song in songs for genre in song.get("genres", [])})
    genre_idx = {genre: idx for idx, genre in enumerate(genres)}

    song_vectors = {}
    for song in songs:
        vec = np.zeros(len(genres))
        for genre in song.get("genres", []):
            genre = genre.strip().lower()
            if genre in genre_idx:
                vec[genre_idx[genre]] = 1
        song_vectors[song["title"]] = vec

    return song_vectors, genres

# 4. 유저 장르 벡터 생성
def make_user_vectors(users, song_vectors):
    user_vectors = {}
    for user in users:
        genre_sum = np.zeros(len(next(iter(song_vectors.values()))))
        count = 0
        for title in user["liked_songs"]:
            if title in song_vectors:
                genre_sum += song_vectors[title]
                count += 1
        if count > 0:
            user_vectors[user["user_id"]] = genre_sum / count
    return user_vectors

# 5. 협업 필터링 기반 추천 후보
def recommend_songs_cf(target_user_id, users, user_vectors, top_k=2, top_n=100):
    if target_user_id not in user_vectors:
        raise ValueError(f"{target_user_id} 벡터가 없습니다.")

    target_vec = user_vectors[target_user_id].reshape(1, -1)
    similarities = []

    for user_id, vec in user_vectors.items():
        if user_id == target_user_id:
            continue
        sim = cosine_similarity(target_vec, vec.reshape(1, -1))[0][0]
        similarities.append((user_id, sim))

    similarities.sort(key=lambda x: x[1], reverse=True)
    top_users = [uid for uid, _ in similarities[:top_k]]

    target_likes = set(next(u["liked_songs"] for u in users if u["user_id"] == target_user_id))
    candidates = set()
    for user in users:
        if user["user_id"] in top_users:
            candidates.update(user["liked_songs"])

    return list(candidates - target_likes)[:top_n]

# 6. 하이브리드 추천
def hybrid_recommend_songs(target_user_id, users, user_vectors, song_vectors, songs, top_k=2, top_n=5, alpha=0.5):
    cf_candidates = recommend_songs_cf(target_user_id, users, user_vectors, top_k=top_k, top_n=100)
    cf_scores = {title: 1.0 for title in cf_candidates}

    target_vec = user_vectors[target_user_id]
    target_likes = set(next(u["liked_songs"] for u in users if u["user_id"] == target_user_id))

    cb_scores = {}
    for song in songs:
        title = song.get("title")
        if title in target_likes or title not in song_vectors:
            continue
        sim = cosine_similarity(target_vec.reshape(1, -1), song_vectors[title].reshape(1, -1))[0][0]
        cb_scores[title] = sim

    all_titles = set(cf_scores.keys()).union(set(cb_scores.keys()))
    hybrid_scores = {}
    for title in all_titles:
        cf_score = cf_scores.get(title, 0)
        cb_score = cb_scores.get(title, 0)
        score = alpha * cb_score + (1 - alpha) * cf_score
        hybrid_scores[title] = score

    sorted_recommendations = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)

    print(f"\n[하이브리드 추천 결과] (alpha={alpha}):")
    for i, (title, score) in enumerate(sorted_recommendations[:top_n], 1):
        print(f"{i}. {title} (점수: {score:.4f})")

    return [title for title, _ in sorted_recommendations[:top_n]]

# 7. 실행
if __name__ == "__main__":
    uri = "mongodb://localhost:27017/"
    db_name = "user"
    target_user = "최민호"

    users = load_users(uri)
    songs = load_all_songs(uri, db_name)
    song_vectors, genres = make_song_vectors(songs)
    user_vectors = make_user_vectors(users, song_vectors)

    hybrid_recommend_songs(
        target_user_id=target_user,
        users=users,
        user_vectors=user_vectors,
        song_vectors=song_vectors,
        songs=songs,
        top_k=2,
        top_n=5,
        alpha=0.6  # 콘텐츠 60%, 협업 40%
    )


In [1]:
from pymongo import MongoClient
import numpy as np
from gensim.models import Word2Vec
from sklearn.metrics.pairwise import cosine_similarity

# -----------------------------
def load_music_data(uri, db_name, collection_name):
    client = MongoClient(uri)
    db = client[db_name]
    collection = db[collection_name]
    data = list(collection.find({}, {"_id": 0}))
    print(f"불러온 곡 개수: {len(data)}")
    return data

# -----------------------------
def make_string(text):
    return " ".join(text) if isinstance(text, list) else str(text)

def make_genres_list(genres):
    if isinstance(genres, list):
        return [g.strip().lower() for g in genres]
    elif isinstance(genres, str):
        return [genres.strip().lower()]
    else:
        return ["unknown"]

# -----------------------------
def train_word2vec(music_data, vector_size=100):
    sentences = []
    for song in music_data:
        lyrics = make_string(song.get("lyrics", "")).lower()
        words = lyrics.split()
        if words:
            sentences.append(words)

    if not sentences:
        raise ValueError('가사 문장이 없음')

    model = Word2Vec(sentences, vector_size=vector_size, window=5, min_count=1, workers=4, epochs=5)
    return model

# -----------------------------
def get_lyrics_vector(lyrics, model):
    words = make_string(lyrics).lower().split()
    word_vectors = [model.wv[word] for word in words if word in model.wv]
    return np.mean(word_vectors, axis=0) if word_vectors else np.zeros(model.vector_size)

def get_genre_vector(genres, all_genres):
    onehot = np.zeros(len(all_genres))
    genres_list = make_genres_list(genres)
    for g in genres_list:
        for i, genre_name in enumerate(all_genres):
            if g == genre_name:
                onehot[i] = 1
    return onehot

def make_song_vectors(music_data, model, all_genres):
    vectors = {}
    for song in music_data:
        title = song.get("title", "")
        if not title:
            continue
        lyrics_vec = get_lyrics_vector(song.get("lyrics", ""), model)
        genre_vec = get_genre_vector(song.get("genres", ""), all_genres)
        combined_vec = np.concatenate([lyrics_vec, genre_vec])
        vectors[title] = combined_vec
    return vectors

# -----------------------------
def load_user_item_matrix(uri, db_name="user"):
    client = MongoClient(uri)
    db = client[db_name]
    user_ids = db.list_collection_names()

    all_titles = set()
    user_likes = {}

    for user_id in user_ids:
        collection = db[user_id]
        titles = [doc["title"] for doc in collection.find({}, {"_id": 0}) if "title" in doc]
        user_likes[user_id] = titles
        all_titles.update(titles)

    all_titles = sorted(all_titles)
    title_index = {title: i for i, title in enumerate(all_titles)}

    matrix = np.zeros((len(user_ids), len(all_titles)))
    for i, user_id in enumerate(user_ids):
        for title in user_likes[user_id]:
            if title in title_index:
                matrix[i][title_index[title]] = 1

    return user_ids, all_titles, matrix, user_likes

# -----------------------------
def hybrid_recommendation(target_user, music_data, user_ids, matrix, user_likes, song_vectors, model_vector_size, all_genres, top_k=3, top_n=5):
    if target_user not in user_ids:
        raise ValueError("해당 유저 없음")

    user_idx = user_ids.index(target_user)
    target_vec = matrix[user_idx].reshape(1, -1)

    similarities = []
    for i, uid in enumerate(user_ids):
        if uid == target_user:
            continue
        sim = cosine_similarity(target_vec, matrix[i].reshape(1, -1))[0][0]
        similarities.append((uid, sim))

    similarities.sort(key=lambda x: x[1], reverse=True)
    top_users = similarities[:top_k]

    target_likes = set(user_likes[target_user])
    cf_candidates = {}
    for uid, sim in top_users:
        for song in user_likes[uid]:
            if song not in target_likes:
                cf_candidates[song] = max(cf_candidates.get(song, 0), sim)

    if not target_likes:
        raise ValueError("타겟 유저가 좋아한 곡이 없습니다.")

    # 타겟 유저가 좋아한 곡들의 평균 벡터를 기준 벡터로 사용
    ref_vectors = [song_vectors[song] for song in target_likes if song in song_vectors]
    if not ref_vectors:
        raise ValueError("기준이 될 곡 벡터가 없습니다.")
    ref_vec = np.mean(ref_vectors, axis=0)

    final_scores = []
    for song, cf_score in cf_candidates.items():
        cbf_vec = song_vectors.get(song)
        if cbf_vec is None:
            continue
        cbf_score = cosine_similarity(ref_vec.reshape(1, -1), cbf_vec.reshape(1, -1))[0][0]
        hybrid_score = 0.6 * cf_score + 0.4 * cbf_score
        final_scores.append((song, hybrid_score))

    final_scores.sort(key=lambda x: x[1], reverse=True)
    return final_scores[:top_n]

# -----------------------------
if __name__ == "__main__":
    uri = "mongodb://localhost:27017/"
    music_db = "music"
    music_collection = "music"
    target_user = input("추천 받을 사용자 이름을 입력하세요: ").strip()

    music_data = load_music_data(uri, music_db, music_collection)
    model = train_word2vec(music_data)

    all_genres_set = set()
    for song in music_data:
        genres = make_genres_list(song.get("genres", ""))
        all_genres_set.update(genres)
    all_genres = sorted(list(all_genres_set))

    song_vectors = make_song_vectors(music_data, model, all_genres)
    user_ids, all_titles, matrix, user_likes = load_user_item_matrix(uri)

    recommendations = hybrid_recommendation(
        target_user, music_data, user_ids, matrix,
        user_likes, song_vectors, model.vector_size,
        all_genres, top_k=3, top_n=5
    )

    print(f"\n'{target_user}'에게 추천할 곡:")
    for title, score in recommendations:
        print(f"- {title} (점수: {score:.4f})")

불러온 곡 개수: 173


ValueError: 해당 유저 없음