## Test

In [None]:
# test dataset loader
from data import *
from torch.utils.data import DataLoader

dataset_dir = "" # test data mp3 경로 설정
test_dataset = create_contrastive_datasets(dataset_dir)

sample_rate = 44100  #[1, sample_rate*30]: 30초로 구간 설정
test_contrastive_dataset = ContrastiveDataset(test_dataset,  input_shape=[1, sample_rate*30])

batch_size = 68 
test_loader = DataLoader(test_contrastive_dataset, batch_size=batch_size, shuffle=False, drop_last=False)

In [None]:
# embedding 추출
import torch
import pandas as pd

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

stage1_model = torch.load('') #모델 경로 설정(.pth)
stage1_model.eval() 
stage1_model.to(device)

music_data = []

for batch in test_loader:
    clip_a, clip_b, file_ids = batch
    clip_a, clip_b= clip_a.to(device), clip_b.to(device)

    with torch.no_grad(): 
        stage1_model = stage1_model.to(clip_a.device)
        emb1,emb2 = stage1_model(clip_a, clip_b, device)
        emb1 = emb1.cpu().numpy()

    for file_id, embedding in zip(file_ids, emb1):
        music_data.append({'id': file_id, 'embedding': embedding})


df = pd.DataFrame(music_data)
df.to_csv("", index=False) #.csv 파일 저장 경로 설정
print(df)

## Evaluate
- 임베딩 기준으로 코사인 유사도 계산 -> recall@k 계산
1. threshold 값 정의 및 추천 곡 정의  
: GT 유사도 값에서 threshold 값 이상의 노래들만 추천 노래로 봄
  
2. k개의 추천 곡 정의  
: 임베딩 기준으로 코사인 유사도 계산해서, 상위 k개의 노래를 가져옴
  
3. k개의 추천 곡과 GT 추천 곡의 교집합   
: 1번과 2번 곡의 공통된 곡의 개수를 셈
  
4. recall@k 계산   
: 3번의 개수 / 1번의 개수

- gt : 앵커 노래 + threshold 유사도를 넘긴 나머지 노래

In [None]:
# 임베딩 사이의 코사인 유사도 계산
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def compute_sim_k(df, k=10):
    """ 각 id의 embedding 벡터를 사용하여 코사인 유사도를 계산하고, 상위 n개 유사한 트랙을 반환 """

    # embedding 열을 numpy 배열로 변환
    df["embedding"] = df["embedding"].apply(lambda x: np.array(x))

    # 모든 트랙 id와 임베딩을 가져옴
    track_ids = df["id"].values
    embeddings = np.stack(df["embedding"].values)  # (num_samples, embedding_dim)

    # 모든 트랙 간의 코사인 유사도 계산
    cosine_sim_matrix = cosine_similarity(embeddings)

    # 결과 저장 리스트
    similarity_results = {}
    
    # 각 앵커 id에 대해 유사도가 높은 10개 트랙 찾기
    for i, anchor_id in enumerate(track_ids):
        similarities = cosine_sim_matrix[i]  # 현재 앵커와 모든 트랙 간 유사도 벡터
        similarities[i] = -1 # 자기 자신 제외외
        
        # 유사도 높은 10개 트랙 선택
        top_indices = np.argsort(similarities)[-1*k:][::-1]  # 내림차순 정렬
        top_similarities = similarities[top_indices]  # 해당 유사도 값
        top_track_ids = track_ids[top_indices]  # 해당 트랙 id
        
        # 결과 저장 (트랙 id, 유사도)
        similarity_results[anchor_id] = [(track, float(sim)) for track, sim in zip(top_track_ids, top_similarities)] 

    return similarity_results

In [None]:
def calculate_recall_at_k(predicted_sim, gt_df, k=10):
    '''
    input :
    predicted_sim : dictionary (key:anchor_id, values:(track_id,sim_values))
    gt_df : dataframe ('anchor_id': anchor_id, 'Track ID': {track_id : sim_value})
    '''
    anchor_ids = gt_df["anchor_id"].values
    ground_truth_sets = gt_df["Track ID"].apply(eval).apply(set).values  # GT track_id의 sim values 집합
    
    recall_scores = []
    for anchor_id, ground_truth in zip(anchor_ids, ground_truth_sets):
        if anchor_id in predicted_sim:
            pred_top_k = predicted_sim[anchor_id] 
            intersection = pred_top_k & ground_truth  # GT와 일치하는 예측 개수 (분자자)
            recall = len(intersection) / len(ground_truth)  # Recall 계산
            recall_scores.append(recall)

    return np.mean(recall_scores)  # 전체 평균 Recall@10

In [None]:
# recall@k 실행 코드
k = 10

# 1. 모델 예측된 유사도 결과 계산
predicted_sim = compute_sim_k(df, k=10)

# 2. Ground Truth 데이터 로드
gt_df = pd.read_csv("ground_truth.csv")  # ground truth 파일 경로 설정

# 3. Recall@10 계산
recall_at_10 = calculate_recall_at_k(predicted_sim, gt_df, k=10)

print(f"Recall@10: {recall_at_10:.4f}")