In [None]:
import json
import pandas as pd
import numpy as np
import os
from pathlib import Path
import laion_clap
import torch
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import warnings
from collections import Counter
import math
warnings.filterwarnings('ignore')

print("Libraries imported successfully!")


In [None]:
# Load input/ground truth pairs
with open('data/input_ground_truth_pairs.json', 'r') as f:
    input_gt_pairs = json.load(f)

print(f"Loaded {len(input_gt_pairs)} sound clips with input/ground truth pairs")
print(f"Example entry: {input_gt_pairs[0]}")


In [None]:
# Load metadata
metadata_df = pd.read_csv('data/BSD10k/BSD10K_metadata_filtered.csv')
print(f"Loaded metadata for {len(metadata_df)} sound clips")
print(metadata_df.head())


In [None]:
# Load tagset
with open('data/tagset_clap_normalized_hyphen_unique.txt', 'r') as f:
    clap_tags = [line.strip() for line in f.readlines()]

print(f"Loaded {len(clap_tags)} unique CLAP tags")
print(f"First 10 tags: {clap_tags[:10]}")


In [None]:
# Compute document frequency from metadata
print("Computing document frequency scores...")

# Count tag occurrences across all documents
tag_document_counts = Counter()
total_documents = len(metadata_df)

for _, row in tqdm(metadata_df.iterrows(), total=len(metadata_df), desc="Processing documents"):
    tags = row['tags'].split(',')
    tags = [tag.strip().lower() for tag in tags]  # Normalize to lowercase
    
    # Count unique tags in this document
    unique_tags = set(tags)
    for tag in unique_tags:
        tag_document_counts[tag] += 1

print(f"Found {len(tag_document_counts)} unique tags in the dataset")
print(f"Total documents: {total_documents}")

# Show top 10 most frequent tags
print("\nTop 10 most frequent tags:")
for tag, count in tag_document_counts.most_common(10):
    percentage = (count / total_documents) * 100
    print(f"{tag}: {count} documents ({percentage:.1f}%)")


In [None]:
# Create DF mapping for CLAP tags
print("Creating DF mapping for CLAP tags...")

# Alpha parameter for DF weighting (tunable)
ALPHA = 0.7

# Create DF scores for each CLAP tag
tag_df_scores = {}
matched_tags = 0
unmatched_tags = 0

for tag in clap_tags:
    tag_lower = tag.lower()
    
    if tag_lower in tag_document_counts:
        df_count = tag_document_counts[tag_lower]
        # Compute normalized DF score with alpha weighting
        # Formula: (1 - alpha) + alpha * (log(1 + df) / log(1 + N))
        # This interpolates between no DF weighting and normalized DF weighting
        normalized_df = math.log(1 + df_count) / math.log(1 + total_documents)
        df_score = (1 - ALPHA) + ALPHA * normalized_df
        tag_df_scores[tag] = df_score
        matched_tags += 1
    else:
        # For tags not found in dataset, use minimum DF (df=0)
        # Formula: (1 - alpha) + alpha * (log(1 + 0) / log(1 + N)) = (1 - alpha)
        default_df_score = 1 - ALPHA  # Minimum possible weight
        tag_df_scores[tag] = default_df_score
        unmatched_tags += 1

print(f"\nDF Mapping Results:")
print(f"Matched tags: {matched_tags}")
print(f"Unmatched tags: {unmatched_tags}")
print(f"Alpha parameter: {ALPHA}")

# Show DF score statistics
df_scores_values = list(tag_df_scores.values())
print(f"\nDF Score Statistics:")
print(f"Min DF score: {min(df_scores_values):.4f}")
print(f"Max DF score: {max(df_scores_values):.4f}")
print(f"Mean DF score: {np.mean(df_scores_values):.4f}")
print(f"Std DF score: {np.std(df_scores_values):.4f}")


In [None]:
# Load pre-computed tag embeddings (hyphen-normalized)
print("Loading pre-computed tag embeddings...")

tags_embeddings_dir = Path('data/BSD10k/embeddings/tags')
embeddings_path = tags_embeddings_dir / 'tag_embeddings_hyphen.npy'

# Load embeddings array
tag_embeddings_array = np.load(embeddings_path)
print(f"Loaded tag embeddings with shape: {tag_embeddings_array.shape}")

# The embeddings were saved in the same order as clap_tags
# So we can use the clap_tags list as the tag names
if len(clap_tags) != tag_embeddings_array.shape[0]:
    print(f"WARNING: Mismatch between clap_tags ({len(clap_tags)}) and embeddings ({tag_embeddings_array.shape[0]})")
    # Use the smaller size to avoid index errors
    num_tags = min(len(clap_tags), tag_embeddings_array.shape[0])
    tag_names_list = clap_tags[:num_tags]
    tag_embeddings_array = tag_embeddings_array[:num_tags]
else:
    tag_names_list = clap_tags
    num_tags = len(clap_tags)

print(f"Number of tags: {num_tags}")
print(f"Embedding dimension: {tag_embeddings_array.shape[2]}")
print(f"Sentence variants per tag: {tag_embeddings_array.shape[1]}")

# Convert to dictionary format (same as original)
tag_embeddings = {}
for i, tag in enumerate(tag_names_list):
    tag_embeddings[tag] = tag_embeddings_array[i]

print(f"\nConverted to dictionary format with {len(tag_embeddings)} tags")
print(f"Example tag embedding shape: {list(tag_embeddings.values())[0].shape}")


In [None]:
# Check available audio embeddings
embeddings_dir = Path('data/BSD10k/embeddings/clap')
available_embeddings = set([int(f.stem) for f in embeddings_dir.glob('*.npy')])

print(f"Found {len(available_embeddings)} audio embeddings")

# Filter input_gt_pairs to only include clips with available embeddings
filtered_pairs = [pair for pair in input_gt_pairs if pair['sound_id'] in available_embeddings]

print(f"Using {len(filtered_pairs)} clips with both ground truth and audio embeddings")


In [None]:
# Load audio embeddings for the filtered clips
print("Loading audio embeddings...")

audio_embeddings = {}

for pair in tqdm(filtered_pairs):
    sound_id = pair['sound_id']
    embedding_path = embeddings_dir / f"{sound_id}.npy"
    
    if embedding_path.exists():
        audio_embeddings[sound_id] = np.load(embedding_path)

print(f"Loaded audio embeddings for {len(audio_embeddings)} sound clips")
if audio_embeddings:
    sample_embedding = list(audio_embeddings.values())[0]
    print(f"Audio embedding dimension: {sample_embedding.shape}")


In [None]:
# Load SBERT model for semantic similarity
print("Loading SBERT model...")
sbert_model = SentenceTransformer('all-MiniLM-L6-v2')
print("SBERT model loaded successfully!")

# Semantic similarity threshold (adjustable)
SIMILARITY_THRESHOLD = 0.7
print(f"Semantic similarity threshold: {SIMILARITY_THRESHOLD}")


In [None]:
# Encode all unique tags with SBERT for semantic similarity comparison
print("Encoding tags with SBERT...")

# Get all unique ground truth tags from the dataset
all_gt_tags = set()
for pair in filtered_pairs:
    all_gt_tags.update([tag.lower() for tag in pair['ground_truth_tags']])

# Combine with all CLAP tags
all_unique_tags = list(all_gt_tags.union(set([tag.lower() for tag in clap_tags])))
print(f"Total unique tags to encode: {len(all_unique_tags)}")

# Encode all tags
tag_sbert_embeddings = sbert_model.encode(all_unique_tags, show_progress_bar=True)
print(f"SBERT embeddings shape: {tag_sbert_embeddings.shape}")

# Create tag to embedding mapping
tag_to_sbert = {tag: embedding for tag, embedding in zip(all_unique_tags, tag_sbert_embeddings)}
print("SBERT encoding completed!")


In [None]:
def get_tag_recommendations_df_weighted(audio_embedding, tag_embeddings, tag_df_scores, top_k=10):
    """
    Get top-k tag recommendations for a given audio embedding with DF weighting.
    For each tag, we take the maximum similarity among its 4 sentence variants,
    then multiply by the DF weight.
    
    Final score = cosine_similarity × df_weight
    where df_weight = (1 - alpha) + alpha * (log(1 + df) / log(1 + N))
    """
    tag_similarities = {}
    
    for tag, text_embeds in tag_embeddings.items():
        # Compute cosine similarity between audio and all text variants
        similarities = cosine_similarity([audio_embedding], text_embeds)[0]
        # Take the maximum similarity among the 4 variants
        base_similarity = np.max(similarities)
        
        # Apply DF weighting
        df_weight = tag_df_scores.get(tag, 1.0)  # Default to 1.0 if tag not found
        weighted_similarity = base_similarity * df_weight
        
        tag_similarities[tag] = {
            'base_similarity': base_similarity,
            'df_weight': df_weight,
            'weighted_similarity': weighted_similarity
        }
    
    # Sort tags by weighted similarity and get top-k
    sorted_tags = sorted(tag_similarities.items(), key=lambda x: x[1]['weighted_similarity'], reverse=True)
    
    top_tags = [tag for tag, _ in sorted_tags[:top_k]]
    top_scores = [scores['weighted_similarity'] for _, scores in sorted_tags[:top_k]]
    top_base_scores = [scores['base_similarity'] for _, scores in sorted_tags[:top_k]]
    top_df_weights = [scores['df_weight'] for _, scores in sorted_tags[:top_k]]
    
    return top_tags, top_scores, top_base_scores, top_df_weights


In [None]:
def compute_semantic_hits(predicted_tags, ground_truth_tags, tag_to_sbert, similarity_threshold=0.7):
    """
    Compute semantic hits using SBERT embeddings and cosine similarity.
    A predicted tag is considered a hit if its semantic similarity
    with any ground truth tag exceeds the threshold.
    """
    hits = []
    semantic_matches = []
    
    predicted_tags_lower = [tag.lower() for tag in predicted_tags]
    ground_truth_tags_lower = [tag.lower() for tag in ground_truth_tags]
    
    for pred_tag in predicted_tags_lower:
        if pred_tag not in tag_to_sbert:
            continue
            
        pred_embedding = tag_to_sbert[pred_tag]
        max_similarity = 0.0
        best_match = None
        
        for gt_tag in ground_truth_tags_lower:
            if gt_tag not in tag_to_sbert:
                continue
                
            gt_embedding = tag_to_sbert[gt_tag]
            similarity = cosine_similarity([pred_embedding], [gt_embedding])[0][0]
            
            if similarity > max_similarity:
                max_similarity = similarity
                best_match = gt_tag
        
        if max_similarity >= similarity_threshold:
            hits.append(pred_tag)
            semantic_matches.append({
                'predicted': pred_tag,
                'matched_gt': best_match,
                'similarity': max_similarity
            })
    
    return hits, semantic_matches

# Test the semantic similarity function
test_pred = ['percussion', 'beat', 'rhythm']
test_gt = ['drum', 'drums', 'drumming']
test_hits, test_matches = compute_semantic_hits(test_pred, test_gt, tag_to_sbert, SIMILARITY_THRESHOLD)
print(f"Test semantic hits: {test_hits}")
print(f"Test matches: {test_matches}")


In [None]:
# Generate DF-weighted recommendations for all clips with SBERT evaluation
print("Generating DF-weighted recommendations with SBERT evaluation...")

results = []

for pair in tqdm(filtered_pairs):
    sound_id = pair['sound_id']
    
    if sound_id not in audio_embeddings:
        continue
    
    # Get audio embedding
    audio_embedding = audio_embeddings[sound_id]
    
    # Get top 10 DF-weighted recommendations
    predicted_tags, prediction_scores, base_scores, df_weights = get_tag_recommendations_df_weighted(
        audio_embedding, tag_embeddings, tag_df_scores, top_k=10
    )
    
    # Normalize ground truth tags to lowercase for comparison
    ground_truth_tags = [tag.lower() for tag in pair['ground_truth_tags']]
    
    # Calculate semantic hits using SBERT
    semantic_hits, semantic_matches = compute_semantic_hits(
        predicted_tags, ground_truth_tags, tag_to_sbert, SIMILARITY_THRESHOLD
    )
    
    # Also compute exact hits for comparison
    predicted_tags_lower = [tag.lower() for tag in predicted_tags]
    exact_hits = list(set(predicted_tags_lower) & set(ground_truth_tags))
    
    # Calculate metrics based on semantic hits
    precision = len(semantic_hits) / 10.0
    recall = len(semantic_hits) / len(ground_truth_tags) if ground_truth_tags else 0.0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
    
    result = {
        'sound_id': sound_id,
        'title': pair['title'],
        'ground_truth_tags': ground_truth_tags,
        'predicted_tags': predicted_tags,
        'prediction_scores': prediction_scores,
        'base_scores': base_scores,
        'df_weights': df_weights,
        'semantic_hits': semantic_hits,
        'semantic_matches': semantic_matches,
        'exact_hits': exact_hits,
        'num_semantic_hits': len(semantic_hits),
        'num_exact_hits': len(exact_hits),
        'precision_at_10': precision,
        'recall': recall,
        'f1_score': f1
    }
    
    results.append(result)

print(f"Generated DF-weighted recommendations with SBERT evaluation for {len(results)} clips")


In [None]:
# Calculate overall metrics
total_semantic_hits = sum(result['num_semantic_hits'] for result in results)
total_exact_hits = sum(result['num_exact_hits'] for result in results)
total_predictions = len(results) * 10
total_ground_truth = sum(len(result['ground_truth_tags']) for result in results)

avg_precision_at_10 = np.mean([result['precision_at_10'] for result in results])
avg_recall = np.mean([result['recall'] for result in results])
avg_semantic_hits_per_clip = np.mean([result['num_semantic_hits'] for result in results])
avg_exact_hits_per_clip = np.mean([result['num_exact_hits'] for result in results])

# Calculate F1 scores
overall_precision = total_semantic_hits / total_predictions
overall_recall = total_semantic_hits / total_ground_truth
overall_f1 = 2 * (overall_precision * overall_recall) / (overall_precision + overall_recall) if (overall_precision + overall_recall) > 0 else 0

# Calculate average F1 score per clip
f1_scores = [result['f1_score'] for result in results]
avg_f1 = np.mean(f1_scores)

print(f"=== CLAP DF-Weighted Tag Recommendation Results with SBERT Evaluation ===")
print(f"Alpha = {ALPHA}, SBERT Threshold = {SIMILARITY_THRESHOLD}")
print(f"Number of clips evaluated: {len(results)}")
print(f"Total semantic hits: {total_semantic_hits}")
print(f"Total exact hits: {total_exact_hits}")
print(f"Total predictions: {total_predictions}")
print(f"Total ground truth tags: {total_ground_truth}")
print()
print(f"=== SEMANTIC SIMILARITY METRICS ===")
print(f"Average Precision@10: {avg_precision_at_10:.4f}")
print(f"Average Recall: {avg_recall:.4f}")
print(f"Average F1 Score: {avg_f1:.4f}")
print(f"Average semantic hits per clip: {avg_semantic_hits_per_clip:.2f}")
print(f"Average exact hits per clip: {avg_exact_hits_per_clip:.2f}")
print()
print(f"Overall Precision: {overall_precision:.4f}")
print(f"Overall Recall: {overall_recall:.4f}")
print(f"Overall F1 Score: {overall_f1:.4f}")
print()
print(f"Improvement over exact matching: {(total_semantic_hits / total_exact_hits - 1) * 100:.1f}% more hits")


In [None]:
# Show some example results with SBERT analysis
print("=== Example DF-Weighted Results with SBERT Evaluation ===")

# Sort by number of semantic hits (best first)
results_sorted = sorted(results, key=lambda x: x['num_semantic_hits'], reverse=True)

for i, result in enumerate(results_sorted[:5]):
    print(f"\n--- Example {i+1} (Sound ID: {result['sound_id']}) ---")
    print(f"Title: {result['title']}")
    print(f"Ground Truth Tags: {result['ground_truth_tags']}")
    print(f"Predicted Tags (with DF weights):")
    for j, (tag, score, base_score, df_weight) in enumerate(zip(
        result['predicted_tags'], result['prediction_scores'], 
        result['base_scores'], result['df_weights']
    ), 1):
        semantic_hit_marker = "🔮" if tag.lower() in result['semantic_hits'] else " "
        exact_hit_marker = "★" if tag.lower() in result['exact_hits'] else " "
        print(f"  {j:2d}.{semantic_hit_marker}{exact_hit_marker} {tag:<20} (W:{score:.4f}, B:{base_score:.4f}, DF:{df_weight:.3f})")
    
    print(f"\nSemantic Matches (threshold={SIMILARITY_THRESHOLD}):")
    for match in result['semantic_matches']:
        print(f"  '{match['predicted']}' ↔ '{match['matched_gt']}' (similarity: {match['similarity']:.3f})")
    
    print(f"\nExact hits: {result['exact_hits']} ({result['num_exact_hits']} hits)")
    print(f"Semantic hits: {result['semantic_hits']} ({result['num_semantic_hits']} hits)")
    print(f"Precision@10: {result['precision_at_10']:.3f}, Recall: {result['recall']:.3f}, F1: {result['f1_score']:.3f}")


In [None]:
# Distribution of semantic hits vs exact hits
semantic_hits_distribution = {}
exact_hits_distribution = {}

for result in results:
    num_semantic = result['num_semantic_hits']
    num_exact = result['num_exact_hits']
    
    semantic_hits_distribution[num_semantic] = semantic_hits_distribution.get(num_semantic, 0) + 1
    exact_hits_distribution[num_exact] = exact_hits_distribution.get(num_exact, 0) + 1

print("\n=== Distribution Comparison: Semantic vs Exact Hits ===")
print("Hits | Semantic Counts | Exact Counts")
print("-----|-----------------|-------------")
max_hits = max(max(semantic_hits_distribution.keys()), max(exact_hits_distribution.keys()))
for hits in range(max_hits + 1):
    semantic_count = semantic_hits_distribution.get(hits, 0)
    exact_count = exact_hits_distribution.get(hits, 0)
    semantic_pct = semantic_count / len(results) * 100
    exact_pct = exact_count / len(results) * 100
    print(f"{hits:4d} | {semantic_count:4d} ({semantic_pct:4.1f}%) | {exact_count:4d} ({exact_pct:4.1f}%)")


In [None]:
# Convert numpy values to Python types for JSON serialization
def convert_numpy_types(obj):
    """Recursively convert numpy types to Python types"""
    if isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, dict):
        return {key: convert_numpy_types(value) for key, value in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy_types(item) for item in obj]
    else:
        return obj

# Convert results to be JSON serializable
json_safe_results = convert_numpy_types(results)

# Save results
output_file = f'eval/clap_baseline_df_sbert_alpha{ALPHA}_threshold{SIMILARITY_THRESHOLD}_results.json'
os.makedirs('eval', exist_ok=True)

# Prepare summary
summary = {
    'model': f'CLAP DF-Weighted with SBERT (Alpha={ALPHA}, Threshold={SIMILARITY_THRESHOLD})',
    'alpha': float(ALPHA),
    'sbert_threshold': float(SIMILARITY_THRESHOLD),
    'sbert_model': 'all-MiniLM-L6-v2',
    'num_clips': len(results),
    'total_semantic_hits': int(total_semantic_hits),
    'total_exact_hits': int(total_exact_hits),
    'total_predictions': int(total_predictions),
    'total_ground_truth': int(total_ground_truth),
    'avg_precision_at_10': float(avg_precision_at_10),
    'avg_recall': float(avg_recall),
    'avg_f1_score': float(avg_f1),
    'avg_semantic_hits_per_clip': float(avg_semantic_hits_per_clip),
    'avg_exact_hits_per_clip': float(avg_exact_hits_per_clip),
    'overall_precision': float(overall_precision),
    'overall_recall': float(overall_recall),
    'overall_f1_score': float(overall_f1),
    'semantic_hits_distribution': convert_numpy_types(semantic_hits_distribution),
    'exact_hits_distribution': convert_numpy_types(exact_hits_distribution),
    'improvement_over_exact': float((total_semantic_hits / total_exact_hits - 1) * 100) if total_exact_hits > 0 else 0.0,
    'df_stats': {
        'matched_tags': matched_tags,
        'unmatched_tags': unmatched_tags,
        'min_df_score': float(min(df_scores_values)),
        'max_df_score': float(max(df_scores_values)),
        'mean_df_score': float(np.mean(df_scores_values)),
        'std_df_score': float(np.std(df_scores_values))
    }
}

# Save detailed results
with open(output_file, 'w') as f:
    json.dump({
        'summary': summary,
        'detailed_results': json_safe_results
    }, f, indent=2)

print(f"Results saved to {output_file}")

# Also save summary only
summary_file = f'eval/clap_baseline_df_sbert_alpha{ALPHA}_threshold{SIMILARITY_THRESHOLD}_summary.json'
with open(summary_file, 'w') as f:
    json.dump(summary, f, indent=2)

print(f"Summary saved to {summary_file}")
