## 프로젝트 세팅


### 경로 설정

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
BASE_DIR = '/content/drive/MyDrive/Filterify/filtering_gun'
SERVICES_DIR = BASE_DIR + '/services'

### 서비스 모듈 임포트

In [None]:
import sys
import os
sys.path.append(SERVICES_DIR)

from run_filtering import (
    run_filtering,
    batch_filtering,
    evaluate_filtering,
    batch_evaluate_filtering
)
from Predictor import initialize_audio_model

### 라이브러리 임포트

In [None]:
import torch
import numpy as np
import warnings
warnings.filterwarnings('ignore')

## 필터링 아키텍처

In [None]:
class ProductionPlaylistFilter:
    """프로덕션 플레이리스트 필터 클래스"""

    def __init__(self, dataset=None, device=None):
        self.dataset = dataset
        self.device = device
        self.default_keep_ratio = 0.8

        print(f"🎯 Production Filter initialized")
        print(f"   Strategy: ratio-based filtering (keep_ratio={self.default_keep_ratio})")

    def calculate_similarity(self, data1, data2):
        """멀티모달 유사도 계산 (text + audio + visual)"""
        try:
            # Text similarity (theme + tone)
            theme_sim = torch.cosine_similarity(
                data1['theme_desc_feature'].unsqueeze(0),
                data2['theme_desc_feature'].unsqueeze(0)
            ).item()
            tone_sim = torch.cosine_similarity(
                data1['tone_desc_feature'].unsqueeze(0),
                data2['tone_desc_feature'].unsqueeze(0)
            ).item()
            text_sim = (theme_sim + tone_sim) / 2

            # Audio similarity
            audio_sim = torch.cosine_similarity(
                data1['audio_features'].unsqueeze(0),
                data2['audio_features'].unsqueeze(0)
            ).item()

            # Visual similarity
            visual_sim = torch.cosine_similarity(
                data1['cover_feature'].unsqueeze(0),
                data2['cover_feature'].unsqueeze(0)
            ).item()

            # Multimodal similarity (weighted combination)
            return (text_sim * 0.4 + audio_sim * 0.3 + visual_sim * 0.3)

        except Exception as e:
            print(f"⚠️ Similarity calculation error: {e}")
            return 0.5

    def analyze_playlist_characteristics(self, track_indices):
        """플레이리스트 특성 분석"""
        if not track_indices or len(track_indices) <= 1:
            return {
                'track_count': len(track_indices),
                'diversity_score': 0.0,
                'cohesion_score': 0.0,
                'recommended_keep_ratio': self.default_keep_ratio
            }

        try:
            # 곡 간 유사도 분석
            similarities = []
            for i in range(len(track_indices)):
                for j in range(i+1, len(track_indices)):
                    data1 = self.dataset[track_indices[i]]
                    data2 = self.dataset[track_indices[j]]
                    sim = self.calculate_similarity(data1, data2)
                    similarities.append(sim)

            diversity_score = 1.0 - np.mean(similarities) if similarities else 0.5
            cohesion_score = np.mean(similarities) if similarities else 0.5

            return {
                'track_count': len(track_indices),
                'diversity_score': diversity_score,
                'cohesion_score': cohesion_score,
                'recommended_keep_ratio': self.default_keep_ratio
            }

        except Exception as e:
            print(f"⚠️ Playlist analysis error: {e}")
            return {
                'track_count': len(track_indices),
                'diversity_score': 0.5,
                'cohesion_score': 0.5,
                'recommended_keep_ratio': self.default_keep_ratio
            }

    def filter_ratio_based(self, track_indices, keep_ratio=0.8):
        """비율 기반 필터링"""
        if len(track_indices) <= 2:
            return track_indices

        num_to_keep = max(1, int(len(track_indices) * keep_ratio))

        # 각 곡의 점수 계산 (첫 곡과의 유사도 + 전체 평균 유사도)
        scores = []
        first_data = self.dataset[track_indices[0]]

        for i, track_idx in enumerate(track_indices):
            if i == 0:
                scores.append((track_idx, 1.0))  # 첫 곡은 최고 점수
                continue

            curr_data = self.dataset[track_idx]

            # 첫 곡과의 유사도
            sim_to_first = self.calculate_similarity(first_data, curr_data)

            # 다른 곡들과의 평균 유사도
            other_sims = []
            for j, other_idx in enumerate(track_indices):
                if j != i:
                    other_data = self.dataset[other_idx]
                    other_sim = self.calculate_similarity(curr_data, other_data)
                    other_sims.append(other_sim)

            avg_sim = np.mean(other_sims) if other_sims else 0.5

            # 최종 점수 (첫 곡과의 관련성 + 전체적 조화)
            final_score = (sim_to_first * 0.6 + avg_sim * 0.4)
            scores.append((track_idx, final_score))

        # 점수 순으로 정렬하여 상위 N개 선택
        scores.sort(key=lambda x: x[1], reverse=True)
        selected_indices = [idx for idx, score in scores[:num_to_keep]]

        # 원래 순서 유지
        return [idx for idx in track_indices if idx in selected_indices]

    def predict_removal(self, track_indices=None, **kwargs):
        """제거할 트랙 예측 (ratio 전략만 사용)"""
        if not self.dataset:
            raise ValueError("Dataset is required for filtering")

        if track_indices is None:
            track_indices = list(range(1, len(self.dataset)))
            print(f"🎵 Excluding anchor track (index 0), evaluating tracks 1-{len(self.dataset)-1}")

        # 플레이리스트 특성 분석
        playlist_chars = self.analyze_playlist_characteristics(track_indices)

        # keep_ratio 설정
        keep_ratio = kwargs.get('keep_ratio', playlist_chars['recommended_keep_ratio'])

        print(f"🎯 Ratio-based Filtering for {len(track_indices)} tracks")
        print(f"   Playlist characteristics: diversity={playlist_chars['diversity_score']:.3f}, cohesion={playlist_chars['cohesion_score']:.3f}")
        print(f"   Using keep_ratio={keep_ratio}")

        try:
            final_indices = self.filter_ratio_based(track_indices, keep_ratio)
            print(f"   Result: {len(final_indices)}/{len(track_indices)} tracks kept")

            # 예측 결과 생성
            predictions = []
            for i in track_indices:
                track_data = self.dataset[i]
                should_remove = i not in final_indices
                predictions.append({
                    'track': track_data['track_name'],
                    'artist': track_data['artists'],
                    'remove_pred': should_remove,
                    'kept': i in final_indices,
                    'track_index': i
                })

            return predictions

        except Exception as e:
            print(f"❌ Error in ratio-based filtering: {e}")
            raise e

## 모델 초기화

In [None]:
# 디바이스 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 오디오 예측기 모델 초기화
audio_model_path = os.path.join(BASE_DIR, "optimal_similarity_1.5_0.003418147563934326.pth")
audio_model = initialize_audio_model(audio_model_path, device)
audio_model.eval()

## 필터링 및 성능 평가

### 단일 플레이리스트 필터링

In [None]:
def filter_playlist(playlist_name, **params):
    """단일 플레이리스트 필터링 - Services 모듈 활용"""
    return run_filtering(
        base_dir=BASE_DIR,
        playlist_name=playlist_name,
        Filter=ProductionPlaylistFilter,
        audio_model=audio_model,
        device=device,
        **params
    )

### 여러 플레이리스트 필터링

In [None]:
def filter_playlists_batch(playlist_names, **params):
    """배치 플레이리스트 필터링 - Services 모듈 활용"""
    return batch_filtering(
        base_dir=BASE_DIR,
        playlist_names=playlist_names,
        Filter=ProductionPlaylistFilter,
        audio_model=audio_model,
        device=device,
        **params
    )

### 단일 플레이리스트 평가

In [None]:
def evaluate_playlist(playlist_name, **params):
    """단일 플레이리스트 평가 - Services 모듈 활용"""
    return evaluate_filtering(
        base_dir=BASE_DIR,
        playlist_name=playlist_name,
        Filter=ProductionPlaylistFilter,
        audio_model=audio_model,
        device=device,
        **params
    )

### 여러 플레이리스트 평가

In [None]:
def evaluate_playlists_batch(playlist_names, **params):
    """배치 플레이리스트 평가 - Services 모듈 활용"""
    return batch_evaluate_filtering(
        base_dir=BASE_DIR,
        playlist_names=playlist_names,
        Filter=ProductionPlaylistFilter,
        audio_model=audio_model,
        device=device,
        **params
    )